Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions apis/python/src/tiledbsoma/soma_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ def read(
# TODO: batch_size
# TODO: partition,
# TODO: platform_config,
) -> Iterator[pa.RecordBatch]:
) -> Iterator[pa.Table]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not type the ids (aka coordinates) parameter to something more specific? See for example SOMADenseNdArray.

I think you can literally use the SOMADenseCoordinates type (in types.py) as this is a dense dataframe, which only takes a scalar or slice.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we need to decide on whether or not ids is optional. The behavior is inconsistent between DataFrame and NdArray - in the former, None is "all", in the latter, None is not accepted (only the explicit slice(None) is accepted).

CC: @thetorpedodog - if you want to propose a standard convention, we can pull it through the entire package.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to make it not an Optional value, we can easily preserve the "get everything if unspecified" behavior by doing

ids: Union[Sequence[int], slice] = slice(None)

I like this solution because it ends up with an explicit default with already-specified semantics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bkmartinjr @thetorpedodog thanks!

Optionality of ids is orthogonal to this PR which is solely pa.RecordBatch -> pa.Table, and has been open long enough, and rebased enough times -- I created #374 to track that

"""
Read a user-defined subset of data, addressed by the dataframe indexing column, optionally filtered, and return results as one or more ``Arrow.RecordBatch``.
Read a user-defined subset of data, addressed by the dataframe indexing column, optionally filtered, and return results as one or more ``Arrow.Table``.

:param ids: Which rows to read. Defaults to ``None``, meaning no constraint -- all rows.

Expand Down Expand Up @@ -217,18 +217,12 @@ def read(
else:
iterator = query.df[ids]

for df in iterator:
batches = df.to_batches()
for batch in batches:
# XXX COMMENT MORE
# This is the 'decode on read' part of our logic; in dim_select we have the
# 'encode on write' part.
# Context: https://github.com/single-cell-data/TileDB-SOMA/issues/99.
#
# Also: don't materialize these on read
# TODO: get the arrow syntax for drop
# df.drop(ROWID, axis=1)
yield util_arrow.ascii_to_unicode_pyarrow_readback(batch)
for table in iterator:
# XXX COMMENT MORE
# This is the 'decode on read' part of our logic; in dim_select we have the
# 'encode on write' part.
# Context: https://github.com/single-cell-data/TileDB-SOMA/issues/99.
yield util_arrow.ascii_to_unicode_pyarrow_readback(table)

def read_all(
self,
Expand All @@ -243,11 +237,11 @@ def read_all(
# TODO: partition,
# TODO: result_order,
# TODO: platform_config,
) -> pa.RecordBatch:
) -> pa.Table:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ids parameter needs a type. See comment on read

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#374 also

"""
This is a convenience method around ``read``. It iterates the return value from ``read`` and returns a concatenation of all the record batches found. Its nominal use is to simply unit-test cases.
This is a convenience method around ``read``. It concatenates all partial read results into a single Table. Its nominal use is to simplify unit-test cases.
"""
return util_arrow.concat_batches(
return pa.concat_tables(
self.read(
ids=ids,
value_filter=value_filter,
Expand All @@ -273,13 +267,13 @@ def _get_is_sparse(self) -> bool:

return self._cached_is_sparse

def write(self, values: pa.RecordBatch) -> None:
def write(self, values: pa.Table) -> None:
"""
Write an Arrow.RecordBatch to the persistent object.
Write an Arrow.Table to the persistent object.

:param values: An Arrow.RecordBatch containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMADataFrame``.
:param values: An Arrow.Table containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMADataFrame``.

The ``values`` Arrow RecordBatch must contain a ``soma_rowid`` (uint64) column, indicating which rows are being written.
The ``values`` Arrow Table must contain a ``soma_rowid`` (uint64) column, indicating which rows are being written.
"""
self._shape = None # cache-invalidate

Expand Down Expand Up @@ -392,7 +386,7 @@ def read_as_pandas_all(
id_column_name: Optional[str] = None,
) -> pd.DataFrame:
"""
Reads from SOMA storage into memory. Iterates over batches from ``read_as_pandas``, concatenating the output into a single dataframe. Convenient for unit-test use; also, handy whenever you're certain that the data being queried can be read entirely into memory.
This is a convenience method around ``read``. It concatenates all partial read results into a single DataFrame. Its nominal use is to simplify unit-test cases.
"""
dataframes = []
generator = self.read_as_pandas(
Expand Down
30 changes: 14 additions & 16 deletions apis/python/src/tiledbsoma/soma_indexed_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ def read(
column_names: Optional[Sequence[str]] = None,
result_order: Optional[SOMAResultOrder] = None,
# TODO: more arguments
) -> Iterator[pa.RecordBatch]:
) -> Iterator[pa.Table]:
"""
Read a user-defined subset of data, addressed by the dataframe indexing columns, optionally filtered, and return results as one or more Arrow.RecordBatch.
Read a user-defined subset of data, addressed by the dataframe indexing columns, optionally filtered, and return results as one or more Arrow.Table.

:param ids: for each index dimension, which rows to read. Defaults to ``None``, meaning no constraint -- all IDs.

Expand Down Expand Up @@ -258,14 +258,12 @@ def read(
else:
iterator = query.df[ids]

for df in iterator:
batches = df.to_batches()
for batch in batches:
# XXX COMMENT MORE
# This is the 'decode on read' part of our logic; in dim_select we have the
# 'encode on write' part.
# Context: # https://github.com/single-cell-data/TileDB-SOMA/issues/99.
yield util_arrow.ascii_to_unicode_pyarrow_readback(batch)
for table in iterator:
# XXX COMMENT MORE
# This is the 'decode on read' part of our logic; in dim_select we have the
# 'encode on write' part.
# Context: # https://github.com/single-cell-data/TileDB-SOMA/issues/99.
yield util_arrow.ascii_to_unicode_pyarrow_readback(table)

def read_all(
self,
Expand All @@ -279,19 +277,19 @@ def read_all(
# TODO: batch_size
# TODO: partition,
# TODO: platform_config,
) -> pa.RecordBatch:
) -> pa.Table:
"""
This is a convenience method around ``read``. It iterates the return value from ``read`` and returns a concatenation of all the record batches found. Its nominal use is to simply unit-test cases.
This is a convenience method around ``read``. It iterates the return value from ``read`` and returns a concatenation of all the table-pieces found. Its nominal use is to simply unit-test cases.
"""
return util_arrow.concat_batches(
return pa.concat_tables(
self.read(ids=ids, value_filter=value_filter, column_names=column_names)
)

def write(self, values: pa.RecordBatch) -> None:
def write(self, values: pa.Table) -> None:
"""
Write an Arrow.RecordBatch to the persistent object. As duplicate index values are not allowed, index values already present in the object are overwritten and new index values are added.
Write an Arrow.Table to the persistent object. As duplicate index values are not allowed, index values already present in the object are overwritten and new index values are added.

:param values: An Arrow.RecordBatch containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMAIndexedDataFrame``.
:param values: An Arrow.Table containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMAIndexedDataFrame``.
"""
self._shape = None # cache-invalidate

Expand Down
32 changes: 10 additions & 22 deletions apis/python/src/tiledbsoma/util_arrow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterator, Optional, Union
from typing import Optional, Union

import numpy as np
import pyarrow as pa
Expand Down Expand Up @@ -121,36 +121,24 @@ def get_arrow_schema_from_tiledb_uri(
return pa.schema(arrow_schema_dict)


def ascii_to_unicode_pyarrow_readback(record_batch: pa.RecordBatch) -> pa.RecordBatch:
def ascii_to_unicode_pyarrow_readback(table: pa.Table) -> pa.Table:
"""
Implements the 'decode on read' part of our ASCII/Unicode logic
"""
# TODO: COMMENT/LINK HEAVILY
names = [ofield.name for ofield in record_batch.schema]
names = [ofield.name for ofield in table.schema]
new_fields = []
for name in names:
old_field = record_batch[name]
if isinstance(old_field, pa.LargeBinaryArray):
old_field = table[name]
# Preferred syntax:
# if len(old_field) > 0 and pa.types.is_large_binary(old_field[0]):
# but:
# AttributeError: 'pyarrow.lib.UInt64Scalar' object has no attribute 'id'
if len(old_field) > 0 and isinstance(old_field[0], pa.LargeBinaryScalar):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the preferred pyarrow approach is to use the pyarrow.types helper functions rather than isinstance. in this case, I believe that would be pyarrow.types.is_large_binary

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bkmartinjr will do, but recall this is going away on the very next PR in this stack (#359)

nfield = pa.array(
[element.as_py().decode("utf-8") for element in old_field]
)
new_fields.append(nfield)
else:
new_fields.append(old_field)
return pa.RecordBatch.from_arrays(new_fields, names=names)


def concat_batches(batch_generator: Iterator[pa.RecordBatch]) -> pa.RecordBatch:
"""
Iterates a generator of ``pyarrow.RecordBatch`` (e.g. ``SOMADataFrame.read``) and returns a concatenation of all the record batches found. The nominal use is to simply unit-test cases.
"""
batches = []
for batch in batch_generator:
batches.append(batch)
assert len(batches) > 0
names = [field.name for field in batches[0].schema]
arrays = []
for name in names:
array = pa.concat_arrays([batch[name] for batch in batches])
arrays.append(array)
return pa.RecordBatch.from_arrays(arrays, names=names)
return pa.Table.from_arrays(new_fields, names=names)
2 changes: 1 addition & 1 deletion apis/python/tests/test_soma_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def create_and_populate_dataframe(dataframe: soma.SOMADataFrame) -> None:
pydict["foo"] = [10, 20, 30, 40, 50]
pydict["bar"] = [4.1, 5.2, 6.3, 7.4, 8.5]
pydict["baz"] = ["apple", "ball", "cat", "dog", "egg"]
rb = pa.RecordBatch.from_pydict(pydict)
rb = pa.Table.from_pydict(pydict)
dataframe.write(rb)


Expand Down
118 changes: 44 additions & 74 deletions apis/python/tests/test_soma_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,118 +26,88 @@ def test_soma_dataframe_non_indexed(tmp_path):
pydict["foo"] = [10, 20, 30, 40, 50]
pydict["bar"] = [4.1, 5.2, 6.3, 7.4, 8.5]
pydict["baz"] = ["apple", "ball", "cat", "dog", "egg"]
rb = pa.RecordBatch.from_pydict(pydict)
rb = pa.Table.from_pydict(pydict)
sdf.write(rb)

# ----------------------------------------------------------------
# Read all
batch = sdf.read_all()
# Weird thing about pyarrow RecordBatch:
# * We should have 5 "rows" with 3 "columns"
# * Indeed batch.num_rows is 5 and batch.num_columns is 3
# * But len(batch) is 3
# * If you thought `for record in record_batch` would print records ... you would be wrong -- it
# loops over columns
assert batch.num_rows == 5
table = sdf.read_all()
assert table.num_rows == 5

# We should be getting back the soma_rowid column as well
# If sparse dataframe:
assert batch.num_columns == 4
assert table.num_columns == 4
# If dense dataframe:
# assert batch.num_columns == 3
# assert table.num_columns == 3

# TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4]
assert [e.as_py() for e in list(batch["foo"])] == pydict["foo"]
assert [e.as_py() for e in list(batch["bar"])] == pydict["bar"]
assert [e.as_py() for e in list(batch["baz"])] == pydict["baz"]
# TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4]
assert [e.as_py() for e in list(table["foo"])] == pydict["foo"]
assert [e.as_py() for e in list(table["bar"])] == pydict["bar"]
assert [e.as_py() for e in list(table["baz"])] == pydict["baz"]

# ----------------------------------------------------------------
# Read by ids
batch = sdf.read_all(ids=[1, 2])
# Weird thing about pyarrow RecordBatch:
# * We should have 5 "rows" with 3 "columns"
# * Indeed batch.num_rows is 5 and batch.num_columns is 3
# * But len(batch) is 3
# * If you thought `for record in record_batch` would print records ... you would be wrong -- it
# loops over columns
assert batch.num_rows == 2
table = sdf.read_all(ids=[1, 2])
assert table.num_rows == 2

# We should be getting back the soma_rowid column as well
# If sparse dataframe:
assert batch.num_columns == 4
assert table.num_columns == 4
# If dense dataframe:
# assert batch.num_columns == 3
# assert table.num_columns == 3

# TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4]
assert sorted([e.as_py() for e in list(batch["foo"])]) == [20, 30]
assert sorted([e.as_py() for e in list(batch["bar"])]) == [5.2, 6.3]
assert sorted([e.as_py() for e in list(batch["baz"])]) == ["ball", "cat"]
# TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4]
assert sorted([e.as_py() for e in list(table["foo"])]) == [20, 30]
assert sorted([e.as_py() for e in list(table["bar"])]) == [5.2, 6.3]
assert sorted([e.as_py() for e in list(table["baz"])]) == ["ball", "cat"]

# ----------------------------------------------------------------
# Read by ids
batch = sdf.read_all(ids=slice(1, 2))
# Weird thing about pyarrow RecordBatch:
# * We should have 5 "rows" with 3 "columns"
# * Indeed batch.num_rows is 5 and batch.num_columns is 3
# * But len(batch) is 3
# * If you thought `for record in record_batch` would print records ... you would be wrong -- it
# loops over columns
assert batch.num_rows == 2
table = sdf.read_all(ids=slice(1, 2))
assert table.num_rows == 2

# We should be getting back the soma_rowid column as well
# If sparse dataframe:
assert batch.num_columns == 4
assert table.num_columns == 4
# If dense dataframe:
# assert batch.num_columns == 3
# assert table.num_columns == 3

# TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4]
assert sorted([e.as_py() for e in list(batch["foo"])]) == [20, 30]
assert sorted([e.as_py() for e in list(batch["bar"])]) == [5.2, 6.3]
assert sorted([e.as_py() for e in list(batch["baz"])]) == ["ball", "cat"]
# TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4]
assert sorted([e.as_py() for e in list(table["foo"])]) == [20, 30]
assert sorted([e.as_py() for e in list(table["bar"])]) == [5.2, 6.3]
assert sorted([e.as_py() for e in list(table["baz"])]) == ["ball", "cat"]

# ----------------------------------------------------------------
# Read by value_filter
batch = sdf.read_all(value_filter="foo == 40 or foo == 20")
# Weird thing about pyarrow RecordBatch:
# * We should have 5 "rows" with 3 "columns"
# * Indeed batch.num_rows is 5 and batch.num_columns is 3
# * But len(batch) is 3
# * If you thought `for record in record_batch` would print records ... you would be wrong -- it
# loops over columns
assert batch.num_rows == 2
table = sdf.read_all(value_filter="foo == 40 or foo == 20")
assert table.num_rows == 2

# We should be getting back the soma_rowid column as well
# If sparse dataframe:
assert batch.num_columns == 4
assert table.num_columns == 4
# If dense dataframe:
# assert batch.num_columns == 3
# assert table.num_columns == 3

# TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4]
assert sorted([e.as_py() for e in list(batch["foo"])]) == [20, 40]
assert sorted([e.as_py() for e in list(batch["bar"])]) == [5.2, 7.4]
assert sorted([e.as_py() for e in list(batch["baz"])]) == ["ball", "dog"]
# TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4]
assert sorted([e.as_py() for e in list(table["foo"])]) == [20, 40]
assert sorted([e.as_py() for e in list(table["bar"])]) == [5.2, 7.4]
assert sorted([e.as_py() for e in list(table["baz"])]) == ["ball", "dog"]

# ----------------------------------------------------------------
# Read by value_filter
batch = sdf.read_all(value_filter='baz == "ball" or baz == "dog"')
# Weird thing about pyarrow RecordBatch:
# * We should have 5 "rows" with 3 "columns"
# * Indeed batch.num_rows is 5 and batch.num_columns is 3
# * But len(batch) is 3
# * If you thought `for record in record_batch` would print records ... you would be wrong -- it
# loops over columns
assert batch.num_rows == 2
table = sdf.read_all(value_filter='baz == "ball" or baz == "dog"')
assert table.num_rows == 2

# We should be getting back the soma_rowid column as well
# If sparse dataframe:
assert batch.num_columns == 4
assert table.num_columns == 4
# If dense dataframe:
# assert batch.num_columns == 3
# assert table.num_columns == 3

# TODO assert [e.as_py() for e in list(batch['soma_rowid'])] == [0,1,2,3,4]
assert sorted([e.as_py() for e in list(batch["foo"])]) == [20, 40]
assert sorted([e.as_py() for e in list(batch["bar"])]) == [5.2, 7.4]
assert sorted([e.as_py() for e in list(batch["baz"])]) == ["ball", "dog"]
# TODO assert [e.as_py() for e in list(table['soma_rowid'])] == [0,1,2,3,4]
assert sorted([e.as_py() for e in list(table["foo"])]) == [20, 40]
assert sorted([e.as_py() for e in list(table["bar"])]) == [5.2, 7.4]
assert sorted([e.as_py() for e in list(table["baz"])]) == ["ball", "dog"]


@pytest.fixture
Expand Down Expand Up @@ -166,7 +136,7 @@ def simple_soma_data_frame(tmp_path):
"C": ["this", "is", "a", "test"],
}
n_data = len(data["soma_rowid"])
rb = pa.RecordBatch.from_pydict(data)
rb = pa.Table.from_pydict(data)
sdf.write(rb)
yield (schema, sdf, n_data)
sdf.delete()
Expand Down Expand Up @@ -217,12 +187,12 @@ def _check_tbl(tbl, col_names, ids):
)

_check_tbl(
pa.Table.from_batches(sdf.read(ids=ids, column_names=col_names)),
sdf.read_all(ids=ids, column_names=col_names),
col_names,
ids,
)
_check_tbl(
pa.Table.from_batches([sdf.read_all(column_names=col_names)]),
sdf.read_all(column_names=col_names),
col_names,
None,
)
Expand Down
Loading