Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
adc1e12
feat: add support for Arrow List and LargeList types with Python bind…
qzyu999 Apr 1, 2026
06b75be
feat: add support for Arrow FixedSizeList arrays
qzyu999 Apr 1, 2026
9be4625
fix: Fix the error message in columnar_row_get_array_non_list_column_…
qzyu999 Apr 1, 2026
79cc1a2
chore: Fix the line length error after running cargo fmt
qzyu999 Apr 1, 2026
4c8427a
fix: Remove the as_encoded_bytes to allow it to revert to the default…
qzyu999 Apr 3, 2026
7ec67ac
fix: Remove the pytest skip for test_append_and_scan_with_fixed_size_…
qzyu999 Apr 3, 2026
4ea94df
feat: Add python_value_to_datum
qzyu999 Apr 3, 2026
354ac01
feat: Add more null tests
qzyu999 Apr 3, 2026
ebc27d8
feat: Add .pyi stubs for DataType, DataTypes
qzyu999 Apr 3, 2026
a697608
fix: Remove DataType, DataTypes as they are dead code only used withi…
qzyu999 Apr 3, 2026
ae8759f
perf: Hoist array downcast and type resolution out of loop
qzyu999 Apr 3, 2026
269ad19
fix: Remove the pytest skip in test_log_table for fixed_size_array
qzyu999 Apr 3, 2026
de268d6
fix: Add back the pytest.mark.skip and provide a valid reason
qzyu999 Apr 3, 2026
a076abc
feat: Add FixedSizeList to handle all Arrow list variants defensively
qzyu999 Apr 6, 2026
8c93452
feat: Add test_append_and_scan_with_large_list_array for LargeList
qzyu999 Apr 6, 2026
2ca95d8
docs: Update documentation to indicate Array data type support
qzyu999 Apr 6, 2026
068f210
refactor: Drop FixedSizeList/LargeList and use nested ColumnWriter in…
qzyu999 Apr 8, 2026
7c6c3ea
chore: Run formatting
qzyu999 Apr 8, 2026
e0d961b
fix: Remove references to FixedSizeList and LargeList
qzyu999 Apr 8, 2026
8a6742c
refactor: pass nullability to finish_list_array and add test for non-…
qzyu999 Apr 11, 2026
22f7cad
chore: Formatting
qzyu999 Apr 11, 2026
60a14d1
feat: add as_nullable, not_null, and nullable methods to DataType and…
qzyu999 Apr 11, 2026
e5267c3
test: update non-nullable array type test to verify Arrow schema fiel…
qzyu999 Apr 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyTypeError};
use pyo3::sync::PyOnceLock;
use pyo3::types::{
IntoPyDict, PyBool, PyByteArray, PyBytes, PyDate, PyDateAccess, PyDateTime, PyDelta,
PyDeltaAccess, PyDict, PyList, PySequence, PySlice, PyTime, PyTimeAccess, PyTuple, PyType,
PyTzInfo,
PyDeltaAccess, PyDict, PyList, PySequence, PySlice, PyString, PyTime, PyTimeAccess, PyTuple,
PyType, PyTzInfo,
};
use pyo3_async_runtimes::tokio::future_into_py;
use std::collections::HashMap;
Expand Down Expand Up @@ -1240,6 +1240,68 @@ fn python_value_to_datum(
fcore::metadata::DataType::Time(_) => python_time_to_datum(value),
fcore::metadata::DataType::Timestamp(_) => python_datetime_to_timestamp_ntz(value),
fcore::metadata::DataType::TimestampLTz(_) => python_datetime_to_timestamp_ltz(value),
fcore::metadata::DataType::Array(array_type) => {
let element_type = array_type.get_element_type();
if value.is_instance_of::<PyString>() {
return Err(FlussError::new_err(format!(
"Expected sequence for Array column, got {}",
get_type_name(value)
)));
}
let seq = value.downcast::<PySequence>().map_err(|_| {
FlussError::new_err(format!(
"Expected sequence for Array column, got {}",
get_type_name(value)
))
})?;

let len = seq.len()?;
let mut writer = fcore::row::binary_array::FlussArrayWriter::new(len, element_type);

for i in 0..len {
let item = seq.get_item(i)?;
if item.is_none() {
writer.set_null_at(i);
} else {
let val_datum = python_value_to_datum(&item, element_type)?;
match val_datum {
Datum::Null => writer.set_null_at(i),
Datum::Bool(v) => writer.write_boolean(i, v),
Datum::Int8(v) => writer.write_byte(i, v),
Datum::Int16(v) => writer.write_short(i, v),
Datum::Int32(v) => writer.write_int(i, v),
Datum::Int64(v) => writer.write_long(i, v),
Datum::Float32(v) => writer.write_float(i, v.into_inner()),
Datum::Float64(v) => writer.write_double(i, v.into_inner()),
Datum::String(v) => writer.write_string(i, &v),
Datum::Blob(v) => writer.write_binary_bytes(i, v.as_ref()),
Datum::Decimal(v) => {
if let fcore::metadata::DataType::Decimal(dt) = element_type {
writer.write_decimal(i, &v, dt.precision());
}
}
Datum::Date(v) => writer.write_date(i, v),
Datum::Time(v) => writer.write_time(i, v),
Datum::TimestampNtz(v) => {
if let fcore::metadata::DataType::Timestamp(dt) = element_type {
writer.write_timestamp_ntz(i, &v, dt.precision());
}
}
Datum::TimestampLtz(v) => {
if let fcore::metadata::DataType::TimestampLTz(dt) = element_type {
writer.write_timestamp_ltz(i, &v, dt.precision());
}
}
Datum::Array(v) => writer.write_array(i, &v),
}
}
}

let array = writer
.complete()
.map_err(|e| FlussError::from_core_error(&e))?;
Ok(Datum::Array(array))
}
_ => Err(FlussError::new_err(format!(
"Unsupported data type for row-level operations: {data_type}"
))),
Expand Down Expand Up @@ -1372,6 +1434,20 @@ pub fn datum_to_python_value(
.map_err(|e| FlussError::from_core_error(&e))?;
rust_timestamp_ltz_to_python(py, ts)
}
DataType::Array(array_type) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about python_value_to_datum?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fresh-borzoni, thanks for catching this, added functionality for python_value_to_datum in 7dec9da along with a set of tests.

let array_data = row
.get_array(pos)
.map_err(|e| FlussError::from_core_error(&e))?;

let element_type = array_type.get_element_type();
let py_list = pyo3::types::PyList::empty(py);

for i in 0..array_data.size() {
let py_val = datum_to_python_value(py, &array_data, i, element_type)?;
py_list.append(py_val)?;
}
Ok(py_list.into_any().unbind())
}
_ => Err(FlussError::new_err(format!(
"Unsupported data type for conversion to Python: {data_type}"
))),
Expand Down
4 changes: 4 additions & 0 deletions bindings/python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ impl Utils {
ArrowDataType::Decimal128(precision, scale) => {
DataTypes::decimal(*precision as u32, *scale as u32)
}
ArrowDataType::List(field) => {
let element_type = Utils::arrow_type_to_fluss_type(field.data_type())?;
DataTypes::array(element_type)
}
_ => {
return Err(FlussError::new_err(format!(
"Unsupported Arrow data type: {arrow_type:?}"
Expand Down
202 changes: 200 additions & 2 deletions bindings/python/test/test_log_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import time

import pyarrow as pa
import pytest

import fluss

Expand Down Expand Up @@ -1120,8 +1121,6 @@ def _poll_records(scanner, expected_count, timeout_s=10):
return collected




def _poll_arrow_ids(scanner, expected_count, timeout_s=10):
"""Poll a batch scanner and extract 'id' column values."""
all_ids = []
Expand All @@ -1132,3 +1131,202 @@ def _poll_arrow_ids(scanner, expected_count, timeout_s=10):
all_ids.extend(arrow_table.column("id").to_pylist())
return all_ids


async def test_append_and_scan_with_array(connection, admin):
"""Test appending and scanning with array columns."""
table_path = fluss.TablePath("fluss", "py_test_append_and_scan_with_array")
await admin.drop_table(table_path, ignore_if_not_exists=True)

pa_schema = pa.schema(
[
pa.field("id", pa.int32()),
pa.field("tags", pa.list_(pa.string())),
pa.field("scores", pa.list_(pa.int32())),
]
)
schema = fluss.Schema(pa_schema)
table_descriptor = fluss.TableDescriptor(schema)
await admin.create_table(table_path, table_descriptor, ignore_if_exists=False)

table = await connection.get_table(table_path)
append_writer = table.new_append().create_writer()

# Batch 1: Testing standard lists
batch1 = pa.RecordBatch.from_arrays(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we tests null values inside arrays as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fresh-borzoni, added null tests here b16a9b0.

[
pa.array([1, 2], type=pa.int32()),
pa.array([["a", "b"], ["c"]], type=pa.list_(pa.string())),
pa.array([[10, 20], [30]], type=pa.list_(pa.int32())),
],
schema=pa_schema,
)
append_writer.write_arrow_batch(batch1)

# Batch 2: Testing null values inside arrays and null arrays
batch2 = pa.RecordBatch.from_arrays(
[
pa.array([3, 4, 5, 6], type=pa.int32()),
pa.array([["d", None], None, [], [None]], type=pa.list_(pa.string())),
pa.array([[40, 50], [60], None, []], type=pa.list_(pa.int32())),
],
schema=pa_schema,
)
append_writer.write_arrow_batch(batch2)
await append_writer.flush()

# Verify via LogScanner (record-by-record)
scanner = await table.new_scan().create_log_scanner()
scanner.subscribe_buckets({0: fluss.EARLIEST_OFFSET})
records = _poll_records(scanner, expected_count=6)

assert len(records) == 6
records.sort(key=lambda r: r.row["id"])

# Verify Batch 1
assert records[0].row["tags"] == ["a", "b"]
assert records[0].row["scores"] == [10, 20]
assert records[1].row["tags"] == ["c"]
assert records[1].row["scores"] == [30]

# Verify Batch 2
assert records[2].row["tags"] == ["d", None]
assert records[2].row["scores"] == [40, 50]
assert records[3].row["tags"] is None
assert records[3].row["scores"] == [60]
assert records[4].row["tags"] == []
assert records[4].row["scores"] is None
assert records[5].row["tags"] == [None]
assert records[5].row["scores"] == []

# Verify via to_arrow (batch-based)
scanner2 = await table.new_scan().create_record_batch_log_scanner()
scanner2.subscribe_buckets({0: fluss.EARLIEST_OFFSET})
result_table = scanner2.to_arrow()

assert result_table.num_rows == 6
assert result_table.column("tags").to_pylist() == [
["a", "b"],
["c"],
["d", None],
None,
[],
[None],
]
assert result_table.column("scores").to_pylist() == [
[10, 20],
[30],
[40, 50],
[60],
None,
[],
]




async def test_append_rows_with_array(connection, admin):
"""Test appending rows with array data as Python lists and scanning."""
table_path = fluss.TablePath("fluss", "py_test_append_rows_with_array")
await admin.drop_table(table_path, ignore_if_not_exists=True)

pa_schema = pa.schema(
[
pa.field("id", pa.int32()),
pa.field("tags", pa.list_(pa.string())),
pa.field("scores", pa.list_(pa.int32())),
]
)
schema = fluss.Schema(pa_schema)
table_descriptor = fluss.TableDescriptor(schema)
await admin.create_table(table_path, table_descriptor, ignore_if_exists=False)

table = await connection.get_table(table_path)
append_writer = table.new_append().create_writer()

# Append rows using dicts with lists
append_writer.append({"id": 1, "tags": ["a", "b"], "scores": [10, 20]})
append_writer.append({"id": 2, "tags": ["c"], "scores": [30]})
# Append row using list with nested list (null handling)
append_writer.append([3, None, [40, None, 60]])

await append_writer.flush()

scanner = await table.new_scan().create_log_scanner()
num_buckets = (await admin.get_table_info(table_path)).num_buckets
scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})

records = _poll_records(scanner, expected_count=3)
assert len(records) == 3

rows = sorted([r.row for r in records], key=lambda r: r["id"])
assert rows[0] == {"id": 1, "tags": ["a", "b"], "scores": [10, 20]}
assert rows[1] == {"id": 2, "tags": ["c"], "scores": [30]}
# Note: records[2].row["tags"] will be None, records[2].row["scores"] will be [40, None, 60]
assert rows[2]["id"] == 3
assert rows[2]["tags"] is None
assert rows[2]["scores"] == [40, None, 60]

await admin.drop_table(table_path, ignore_if_not_exists=False)


async def test_append_rows_with_nested_array(connection, admin):
"""Test appending rows with nested array data (ARRAY<ARRAY<INT>>) and scanning."""
table_path = fluss.TablePath("fluss", "py_test_append_rows_with_nested_array")
await admin.drop_table(table_path, ignore_if_not_exists=True)

pa_schema = pa.schema([
pa.field("id", pa.int32()),
pa.field("matrix", pa.list_(pa.list_(pa.int32()))),
])
schema = fluss.Schema(pa_schema)
await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=False)

table = await connection.get_table(table_path)
append_writer = table.new_append().create_writer()

# Append nested lists
append_writer.append({"id": 1, "matrix": [[1, 2], [3, 4]]})
append_writer.append({"id": 2, "matrix": [[], [5], [6, 7, 8]]})
append_writer.append({"id": 3, "matrix": None})
append_writer.append({"id": 4, "matrix": [[1, None], None, []]})
append_writer.append({"id": 5, "matrix": [[None, None]]})

await append_writer.flush()

scanner = await table.new_scan().create_log_scanner()
num_buckets = (await admin.get_table_info(table_path)).num_buckets
scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)})

records = _poll_records(scanner, expected_count=5)
assert len(records) == 5

rows = sorted([r.row for r in records], key=lambda r: r["id"])
assert rows[0] == {"id": 1, "matrix": [[1, 2], [3, 4]]}
assert rows[1] == {"id": 2, "matrix": [[], [5], [6, 7, 8]]}
assert rows[2] == {"id": 3, "matrix": None}
assert rows[3] == {"id": 4, "matrix": [[1, None], None, []]}
assert rows[4] == {"id": 5, "matrix": [[None, None]]}

await admin.drop_table(table_path, ignore_if_not_exists=False)


async def test_append_rows_with_invalid_array(connection, admin):
"""Test that appending invalid data to an array column raises an error."""
table_path = fluss.TablePath("fluss", "py_test_append_rows_with_invalid_array")
await admin.drop_table(table_path, ignore_if_not_exists=True)

pa_schema = pa.schema([
pa.field("id", pa.int32()),
pa.field("tags", pa.list_(pa.string())),
])
schema = fluss.Schema(pa_schema)
await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=False)

table = await connection.get_table(table_path)
append_writer = table.new_append().create_writer()

# Appending a string instead of a list should raise an error
with pytest.raises(Exception, match="Expected sequence for Array column"):
append_writer.append({"id": 4, "tags": "not_a_list"})

await admin.drop_table(table_path, ignore_if_not_exists=False)
13 changes: 13 additions & 0 deletions bindings/python/test/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,16 @@ def test_get_primary_keys():
assert schema_without_pk.get_primary_keys() == []


def test_schema_with_array():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update documentation as well on Array data type support?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @leekeiabstraction, documentation updated in 5210fe5 for PyArrow list, large list, and fixed size list data types.

# Test that a schema can be constructed from a pyarrow schema containing a list
fields = pa.schema(
[
pa.field("id", pa.int32()),
pa.field("tags", pa.list_(pa.string())),
]
)
schema = fluss.Schema(fields)
assert schema.get_column_names() == ["id", "tags"]
assert schema.get_column_types() == ["int", "array<string>"]


Loading