-
Notifications
You must be signed in to change notification settings - Fork 37
feat: (Python) Add async context manager #487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
2a29c63
dec0921
180ad11
a1cadf2
ff10371
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -40,9 +40,8 @@ async def main(): | |||||||||
| config = fluss.Config(config_spec) | ||||||||||
|
|
||||||||||
| # Create connection using the static create method | ||||||||||
| conn = await fluss.FlussConnection.create(config) | ||||||||||
|
|
||||||||||
| # Define fields for PyArrow | ||||||||||
| async with await fluss.FlussConnection.create(config) as conn: | ||||||||||
| # Define fields for PyArrow | ||||||||||
| fields = [ | ||||||||||
| pa.field("id", pa.int32()), | ||||||||||
| pa.field("name", pa.string()), | ||||||||||
|
|
@@ -105,8 +104,8 @@ async def main(): | |||||||||
| print(f"Got table: {table}") | ||||||||||
|
|
||||||||||
| # Create a writer for the table | ||||||||||
| append_writer = table.new_append().create_writer() | ||||||||||
| print(f"Created append writer: {append_writer}") | ||||||||||
| async with table.new_append().create_writer() as append_writer: | ||||||||||
| print(f"Created append writer: {append_writer}") | ||||||||||
|
|
||||||||||
| try: | ||||||||||
| # Demo: Write PyArrow Table | ||||||||||
|
|
@@ -240,10 +239,10 @@ async def main(): | |||||||||
| append_writer.write_pandas(df) | ||||||||||
| print("Successfully wrote Pandas DataFrame") | ||||||||||
|
|
||||||||||
| # Flush all pending data | ||||||||||
| print("\n--- Flushing data ---") | ||||||||||
| await append_writer.flush() | ||||||||||
| print("Successfully flushed data") | ||||||||||
| # Note: flush() and close() are automatically called by the 'async with' block on successful exit. | ||||||||||
|
||||||||||
| # Note: flush() and close() are automatically called by the 'async with' block on successful exit. | |
| # Note: flush() is automatically called by the 'async with' block on successful exit. |
Outdated
Copilot
AI
Apr 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment claims the async with block will automatically call both flush() and close(), but UpsertWriter.__aexit__ currently only flushes and does not close/invalidate the writer. Please update the example comment to reflect reality (or add close semantics and invoke them from __aexit__).
| # flush() and close() are automatically called by the 'async with' block on successful exit. | |
| # Bypass manual flush: | |
| # flush() is automatically called by the 'async with' block on successful exit. | |
| # No manual flush is needed here: |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,6 +104,28 @@ impl FlussConnection { | |
| Ok(false) | ||
| } | ||
|
|
||
| // Enter the async runtime context (for 'async with' statement) | ||
| fn __aenter__<'py>(slf: PyRef<'py, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { | ||
| let py_slf = slf.into_pyobject(py)?.unbind(); | ||
| future_into_py(py, async move { Ok(py_slf) }) | ||
| } | ||
|
|
||
| // Exit the async runtime context (for 'async with' statement) | ||
| #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))] | ||
| fn __aexit__<'py>( | ||
| &mut self, | ||
| py: Python<'py>, | ||
| _exc_type: Option<Bound<'py, PyAny>>, | ||
| _exc_value: Option<Bound<'py, PyAny>>, | ||
| _traceback: Option<Bound<'py, PyAny>>, | ||
| ) -> PyResult<Bound<'py, PyAny>> { | ||
| future_into_py(py, async move { | ||
| // In the future, we could call an async close on the core connection here | ||
| // e.g., client.close().await; | ||
| Ok(false) | ||
| }) | ||
| } | ||
|
Comment on lines
+113
to
+128
|
||
|
|
||
| fn __repr__(&self) -> String { | ||
| "FlussConnection()".to_string() | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
async with await FlussConnection.create(...) as conn:block has no executable statement in its body (only a comment), andfields = [...]is dedented. This will raise a SyntaxError (“expected an indented block”) and also ends the connection context immediately. Please indent the subsequent setup logic under theasync with(or add a real statement inside the block) so the connection remains open for the rest ofmain().There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Ran into the following running the example