-
Notifications
You must be signed in to change notification settings - Fork 30
[python] Conform to spec by reading as pyarrow.Table not pyarrow.RecordBatch #355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -169,9 +169,9 @@ def read( | |
| # TODO: batch_size | ||
| # TODO: partition, | ||
| # TODO: platform_config, | ||
| ) -> Iterator[pa.RecordBatch]: | ||
| ) -> Iterator[pa.Table]: | ||
| """ | ||
| Read a user-defined subset of data, addressed by the dataframe indexing column, optionally filtered, and return results as one or more ``Arrow.RecordBatch``. | ||
| Read a user-defined subset of data, addressed by the dataframe indexing column, optionally filtered, and return results as one or more ``Arrow.Table``. | ||
|
|
||
| :param ids: Which rows to read. Defaults to ``None``, meaning no constraint -- all rows. | ||
|
|
||
|
|
@@ -217,18 +217,16 @@ def read( | |
| else: | ||
| iterator = query.df[ids] | ||
|
|
||
| for df in iterator: | ||
| batches = df.to_batches() | ||
| for batch in batches: | ||
| # XXX COMMENT MORE | ||
| # This is the 'decode on read' part of our logic; in dim_select we have the | ||
| # 'encode on write' part. | ||
| # Context: https://github.com/single-cell-data/TileDB-SOMA/issues/99. | ||
| # | ||
| # Also: don't materialize these on read | ||
| # TODO: get the arrow syntax for drop | ||
| # df.drop(ROWID, axis=1) | ||
| yield util_arrow.ascii_to_unicode_pyarrow_readback(batch) | ||
| for table in iterator: | ||
| # XXX COMMENT MORE | ||
johnkerl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # This is the 'decode on read' part of our logic; in dim_select we have the | ||
| # 'encode on write' part. | ||
| # Context: https://github.com/single-cell-data/TileDB-SOMA/issues/99. | ||
| # | ||
| # Also: don't materialize these on read | ||
| # TODO: get the arrow syntax for drop | ||
|
||
| # df.drop(ROWID, axis=1) | ||
| yield util_arrow.ascii_to_unicode_pyarrow_readback(table) | ||
|
|
||
| def read_all( | ||
| self, | ||
|
|
@@ -243,11 +241,11 @@ def read_all( | |
| # TODO: partition, | ||
| # TODO: result_order, | ||
| # TODO: platform_config, | ||
| ) -> pa.RecordBatch: | ||
| ) -> pa.Table: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #374 also |
||
| """ | ||
| This is a convenience method around ``read``. It iterates the return value from ``read`` and returns a concatenation of all the record batches found. Its nominal use is to simply unit-test cases. | ||
| This is a convenience method around ``read``. It iterates the return value from ``read`` and returns a concatenation of all the table-pieces found. Its nominal use is to simply unit-test cases. | ||
|
||
| """ | ||
| return util_arrow.concat_batches( | ||
| return pa.concat_tables( | ||
| self.read( | ||
| ids=ids, | ||
| value_filter=value_filter, | ||
|
|
@@ -273,13 +271,13 @@ def _get_is_sparse(self) -> bool: | |
|
|
||
| return self._cached_is_sparse | ||
|
|
||
| def write(self, values: pa.RecordBatch) -> None: | ||
| def write(self, values: pa.Table) -> None: | ||
| """ | ||
| Write an Arrow.RecordBatch to the persistent object. | ||
| Write an Arrow.Table to the persistent object. | ||
|
|
||
| :param values: An Arrow.RecordBatch containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMADataFrame``. | ||
| :param values: An Arrow.Table containing all columns, including the index columns. The schema for the values must match the schema for the ``SOMADataFrame``. | ||
|
|
||
| The ``values`` Arrow RecordBatch must contain a ``soma_rowid`` (uint64) column, indicating which rows are being written. | ||
| The ``values`` Arrow Table must contain a ``soma_rowid`` (uint64) column, indicating which rows are being written. | ||
| """ | ||
| self._shape = None # cache-invalidate | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| from typing import Iterator, Optional, Union | ||
| from typing import Optional, Union | ||
|
|
||
| import numpy as np | ||
| import pyarrow as pa | ||
|
|
@@ -121,36 +121,20 @@ def get_arrow_schema_from_tiledb_uri( | |
| return pa.schema(arrow_schema_dict) | ||
|
|
||
|
|
||
| def ascii_to_unicode_pyarrow_readback(record_batch: pa.RecordBatch) -> pa.RecordBatch: | ||
| def ascii_to_unicode_pyarrow_readback(table: pa.Table) -> pa.Table: | ||
| """ | ||
| Implements the 'decode on read' part of our ASCII/Unicode logic | ||
| """ | ||
| # TODO: COMMENT/LINK HEAVILY | ||
| names = [ofield.name for ofield in record_batch.schema] | ||
| names = [ofield.name for ofield in table.schema] | ||
| new_fields = [] | ||
| for name in names: | ||
| old_field = record_batch[name] | ||
| if isinstance(old_field, pa.LargeBinaryArray): | ||
| old_field = table[name] | ||
bkmartinjr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if len(old_field) > 0 and isinstance(old_field[0], pa.LargeBinaryScalar): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the preferred pyarrow approach is to use the pyarrow.types helper functions rather than
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bkmartinjr will do, but recall this is going away on the very next PR in this stack (#359) |
||
| nfield = pa.array( | ||
| [element.as_py().decode("utf-8") for element in old_field] | ||
| ) | ||
| new_fields.append(nfield) | ||
| else: | ||
| new_fields.append(old_field) | ||
| return pa.RecordBatch.from_arrays(new_fields, names=names) | ||
|
|
||
|
|
||
| def concat_batches(batch_generator: Iterator[pa.RecordBatch]) -> pa.RecordBatch: | ||
| """ | ||
| Iterates a generator of ``pyarrow.RecordBatch`` (e.g. ``SOMADataFrame.read``) and returns a concatenation of all the record batches found. The nominal use is to simply unit-test cases. | ||
| """ | ||
| batches = [] | ||
| for batch in batch_generator: | ||
| batches.append(batch) | ||
| assert len(batches) > 0 | ||
| names = [field.name for field in batches[0].schema] | ||
| arrays = [] | ||
| for name in names: | ||
| array = pa.concat_arrays([batch[name] for batch in batches]) | ||
| arrays.append(array) | ||
| return pa.RecordBatch.from_arrays(arrays, names=names) | ||
| return pa.Table.from_arrays(new_fields, names=names) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not type the
ids(aka coordinates) parameter to something more specific? See for example SOMADenseNdArray.I think you can literally use the
SOMADenseCoordinatestype (in types.py) as this is a dense dataframe, which only takes a scalar or slice.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we need to decide on whether or not
idsis optional. The behavior is inconsistent between DataFrame and NdArray - in the former, None is "all", in the latter, None is not accepted (only the explicitslice(None)is accepted).CC: @thetorpedodog - if you want to propose a standard convention, we can pull it through the entire package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to make it not an Optional value, we can easily preserve the "get everything if unspecified" behavior by doing
ids: Union[Sequence[int], slice] = slice(None)I like this solution because it ends up with an explicit default with already-specified semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bkmartinjr @thetorpedodog thanks!
Optionality of
idsis orthogonal to this PR which is solelypa.RecordBatch->pa.Table, and has been open long enough, and rebased enough times -- I created #374 to track that