Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions go/adbc/driver/flightsql/flightsql_statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ const (
// so this is not entirely necessary depending on the version
// of substrait and the capabilities of the server.
OptionStatementSubstraitVersion = "adbc.flight.sql.substrait.version"
// OptionStatementIsUpdate is a read-only option that indicates whether a
// prepared statement performs an update (DML) rather than returning a
// result set. The value is "true" or "false". If the server did not
// include the hint in its CreatePreparedStatement response, reading this
// option returns StatusNotFound.
OptionStatementIsUpdate = "adbc.flight.sql.is_update"
)

func atomicLoadFloat64(x *float64) float64 {
Expand Down Expand Up @@ -256,6 +262,24 @@ func (s *statement) GetOption(key string) (string, error) {
return adbc.OptionValueEnabled, nil
}
return adbc.OptionValueDisabled, nil
case OptionStatementIsUpdate:
if s.prepared == nil {
return "", adbc.Error{
Msg: "[Flight SQL] adbc.flight.sql.is_update is only available after Prepare()",
Code: adbc.StatusNotFound,
}
}
isUpdate := s.prepared.IsUpdate()
if isUpdate == nil {
return "", adbc.Error{
Msg: "[Flight SQL] server did not provide is_update hint for this prepared statement",
Code: adbc.StatusNotFound,
}
}
if *isUpdate {
return adbc.OptionValueEnabled, nil
}
return adbc.OptionValueDisabled, nil
}

if strings.HasPrefix(key, OptionRPCCallHeaderPrefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ class ConnectionOptions(enum.Enum):
class StatementOptions(enum.Enum):
"""Statement options specific to the Flight SQL driver."""

#: Whether the prepared statement performs an update (DML) rather than
#: returning a result set.
#:
#: Read-only. Only available after the statement has been prepared.
#: Returns "true" or "false". If the server did not include this hint
#: in its CreatePreparedStatement response, reading this option raises
#: an error.
IS_UPDATE = "adbc.flight.sql.is_update"
#: The latest FlightInfo value.
#:
#: Thread-safe. Mostly useful when using incremental execution, where an
Expand Down
23 changes: 19 additions & 4 deletions python/adbc_driver_manager/adbc_driver_manager/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import abc
import datetime
import logging
import os
import pathlib
import threading
Expand Down Expand Up @@ -72,6 +73,8 @@
# ----------------------------------------------------------
# Globals

_logger = logging.getLogger(__name__)

#: The DB-API API level (2.0).
apilevel = "2.0"
#: The thread safety level (connections may not be shared).
Expand Down Expand Up @@ -921,10 +924,22 @@ def execute(self, operation: Union[bytes, str], parameters=None) -> "Self":
self._clear()
self._prepare_execute(operation, parameters)

handle, self._rowcount = _blocking_call(
self._stmt.execute_query, (), {}, self._stmt.cancel
)
self._results = _RowIterator(self._stmt, handle, self._backend)
is_update = False
try:
val = self._stmt.get_option("adbc.flight.sql.is_update")
is_update = val.lower() == "true"
except (adbc_driver_manager.NotSupportedError, adbc_driver_manager.ProgrammingError) as e:
_logger.debug("adbc.flight.sql.is_update option not available: %s", e)

if is_update:
self._rowcount = _blocking_call(
self._stmt.execute_update, (), {}, self._stmt.cancel
)
else:
handle, self._rowcount = _blocking_call(
self._stmt.execute_query, (), {}, self._stmt.cancel
)
self._results = _RowIterator(self._stmt, handle, self._backend)
return self

def executemany(self, operation: Union[bytes, str], seq_of_parameters) -> None:
Expand Down