From 2b20d3e1a186662f2d4a7afce82e852c9ea77d27 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Mon, 2 Mar 2026 17:01:30 -0800 Subject: [PATCH 1/6] WIP --- pymongo/asynchronous/network.py | 201 ++++++------------- pymongo/synchronous/network.py | 201 ++++++------------- pymongo/telemetry.py | 339 ++++++++++++++++++++++++++++++++ pymongo/tracing.py | 132 +++++++++++++ pyproject.toml | 1 + uv.lock | 19 +- 6 files changed, 618 insertions(+), 275 deletions(-) create mode 100644 pymongo/telemetry.py create mode 100644 pymongo/tracing.py diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index 5a5dc7fa2c..b584dc8adb 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -16,7 +16,6 @@ from __future__ import annotations import datetime -import logging from typing import ( TYPE_CHECKING, Any, @@ -31,17 +30,14 @@ from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message from pymongo.compression_support import _NO_COMPRESSION -from pymongo.errors import ( - NotPrimaryError, - OperationFailure, -) -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate from pymongo.network_layer import ( async_receive_message, async_sendall, ) +from pymongo.telemetry import command_telemetry +from pymongo.tracing import add_cursor_id if TYPE_CHECKING: from bson import CodecOptions @@ -159,140 +155,71 @@ async def command( if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=spec, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_start( - orig, - dbname, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, - ) - try: - await async_sendall(conn.conn.get_conn, msg) - if use_op_msg and unacknowledged: - # Unacknowledged, fake a successful command response. - reply = None - response_doc: _DocumentOut = {"ok": 1} - else: - reply = await async_receive_message(conn, request_id) - conn.more_to_come = reply.more_to_come - unpacked_docs = reply.unpack_response( - codec_options=codec_options, user_fields=user_fields - ) - - response_doc = unpacked_docs[0] - if not conn.ready: - cluster_time = response_doc.get("$clusterTime") - if cluster_time: - conn._cluster_time = cluster_time - if client: - await client._process_response(response_doc, session) - if check: - helpers_shared._check_command_response( - response_doc, - conn.max_wire_version, - allowable_errors, - parse_write_concern_error=parse_write_concern_error, - ) - except Exception as exc: - duration = datetime.datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = message._convert_exception(exc) - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), + with command_telemetry( + command_name=name, + database_name=dbname, + spec=spec, + address=address if address else conn.address, + driver_connection_id=conn.id, + server_connection_id=conn.server_connection_id, + publish_event=publish, + start_time=start, + client=client, + listeners=listeners, + request_id=request_id, + service_id=conn.service_id, + ) as telemetry: + try: + await async_sendall(conn.conn.get_conn, msg) + if use_op_msg and unacknowledged: + # Unacknowledged, fake a successful command response. + reply = None + response_doc: _DocumentOut = {"ok": 1} + else: + reply = await async_receive_message(conn, request_id) + conn.more_to_come = reply.more_to_come + unpacked_docs = reply.unpack_response( + codec_options=codec_options, user_fields=user_fields ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_failure( - duration, - failure, - name, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbname, - ) - raise - duration = datetime.datetime.now() - start - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=response_doc, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_success( - duration, - response_doc, - name, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, + + response_doc = unpacked_docs[0] + if not conn.ready: + cluster_time = response_doc.get("$clusterTime") + if cluster_time: + conn._cluster_time = cluster_time + if client: + await client._process_response(response_doc, session) + if check: + helpers_shared._check_command_response( + response_doc, + conn.max_wire_version, + allowable_errors, + parse_write_concern_error=parse_write_concern_error, + ) + except Exception as exc: + telemetry.publish_failed(exc) + raise + + # Add cursor_id to span if present in response + if telemetry.span is not None and isinstance(response_doc, dict): + cursor_info = response_doc.get("cursor") + if cursor_info and isinstance(cursor_info, dict): + cursor_id = cursor_info.get("id", 0) + if cursor_id: + add_cursor_id(telemetry.span, cursor_id) + + # Publish command succeeded event + telemetry.publish_succeeded( + reply=response_doc, speculative_hello=speculative_hello, - database_name=dbname, + speculative_authenticate="speculativeAuthenticate" in orig, ) - if client and client._encrypter and reply: - decrypted = await client._encrypter.decrypt(reply.raw_command_response()) - response_doc = cast( - "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] - ) + if client and client._encrypter and reply: + decrypted = await client._encrypter.decrypt(reply.raw_command_response()) + response_doc = cast( + "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] + ) - return response_doc # type: ignore[return-value] + return response_doc # type: ignore[return-value] diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 7d9bca4d58..497535cb43 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -16,7 +16,6 @@ from __future__ import annotations import datetime -import logging from typing import ( TYPE_CHECKING, Any, @@ -31,17 +30,14 @@ from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message from pymongo.compression_support import _NO_COMPRESSION -from pymongo.errors import ( - NotPrimaryError, - OperationFailure, -) -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _OpMsg from pymongo.monitoring import _is_speculative_authenticate from pymongo.network_layer import ( receive_message, sendall, ) +from pymongo.telemetry import command_telemetry +from pymongo.tracing import add_cursor_id if TYPE_CHECKING: from bson import CodecOptions @@ -159,140 +155,71 @@ def command( if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD: message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD) - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=spec, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_start( - orig, - dbname, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, - ) - try: - sendall(conn.conn.get_conn, msg) - if use_op_msg and unacknowledged: - # Unacknowledged, fake a successful command response. - reply = None - response_doc: _DocumentOut = {"ok": 1} - else: - reply = receive_message(conn, request_id) - conn.more_to_come = reply.more_to_come - unpacked_docs = reply.unpack_response( - codec_options=codec_options, user_fields=user_fields - ) - - response_doc = unpacked_docs[0] - if not conn.ready: - cluster_time = response_doc.get("$clusterTime") - if cluster_time: - conn._cluster_time = cluster_time - if client: - client._process_response(response_doc, session) - if check: - helpers_shared._check_command_response( - response_doc, - conn.max_wire_version, - allowable_errors, - parse_write_concern_error=parse_write_concern_error, - ) - except Exception as exc: - duration = datetime.datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = message._convert_exception(exc) - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), + with command_telemetry( + command_name=name, + database_name=dbname, + spec=spec, + address=address if address else conn.address, + driver_connection_id=conn.id, + server_connection_id=conn.server_connection_id, + publish_event=publish, + start_time=start, + client=client, + listeners=listeners, + request_id=request_id, + service_id=conn.service_id, + ) as telemetry: + try: + sendall(conn.conn.get_conn, msg) + if use_op_msg and unacknowledged: + # Unacknowledged, fake a successful command response. + reply = None + response_doc: _DocumentOut = {"ok": 1} + else: + reply = receive_message(conn, request_id) + conn.more_to_come = reply.more_to_come + unpacked_docs = reply.unpack_response( + codec_options=codec_options, user_fields=user_fields ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_failure( - duration, - failure, - name, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbname, - ) - raise - duration = datetime.datetime.now() - start - if client is not None: - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=response_doc, - commandName=next(iter(spec)), - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_success( - duration, - response_doc, - name, - request_id, - address, - conn.server_connection_id, - service_id=conn.service_id, + + response_doc = unpacked_docs[0] + if not conn.ready: + cluster_time = response_doc.get("$clusterTime") + if cluster_time: + conn._cluster_time = cluster_time + if client: + client._process_response(response_doc, session) + if check: + helpers_shared._check_command_response( + response_doc, + conn.max_wire_version, + allowable_errors, + parse_write_concern_error=parse_write_concern_error, + ) + except Exception as exc: + telemetry.publish_failed(exc) + raise + + # Add cursor_id to span if present in response + if telemetry.span is not None and isinstance(response_doc, dict): + cursor_info = response_doc.get("cursor") + if cursor_info and isinstance(cursor_info, dict): + cursor_id = cursor_info.get("id", 0) + if cursor_id: + add_cursor_id(telemetry.span, cursor_id) + + # Publish command succeeded event + telemetry.publish_succeeded( + reply=response_doc, speculative_hello=speculative_hello, - database_name=dbname, + speculative_authenticate="speculativeAuthenticate" in orig, ) - if client and client._encrypter and reply: - decrypted = client._encrypter.decrypt(reply.raw_command_response()) - response_doc = cast( - "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] - ) + if client and client._encrypter and reply: + decrypted = client._encrypter.decrypt(reply.raw_command_response()) + response_doc = cast( + "_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0] + ) - return response_doc # type: ignore[return-value] + return response_doc # type: ignore[return-value] diff --git a/pymongo/telemetry.py b/pymongo/telemetry.py new file mode 100644 index 0000000000..e909c08995 --- /dev/null +++ b/pymongo/telemetry.py @@ -0,0 +1,339 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unified telemetry support for PyMongo. + +Supports telemetry using standardized logging, event publishing, and OpenTelemetry. + +To enable OpenTelemetry logging, set the environment variable: + OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED=true + +.. versionadded:: 4.x +""" +from __future__ import annotations + +import logging +from datetime import datetime +from typing import TYPE_CHECKING, Any, Mapping, Optional + +from pymongo import message +from pymongo.errors import NotPrimaryError, OperationFailure +from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log +from pymongo.monitoring import _EventListeners +from pymongo.tracing import ( + _build_query_summary, + _extract_collection_name, + _get_tracer, + _is_sensitive_command, +) + +try: + from opentelemetry import trace + from opentelemetry.trace import Span, SpanKind, Status, StatusCode + + _HAS_OPENTELEMETRY = True +except ImportError: + _HAS_OPENTELEMETRY = False + trace = None # type: ignore[assignment] + Span = None # type: ignore[assignment, misc] + SpanKind = None # type: ignore[assignment, misc] + Status = None # type: ignore[assignment, misc] + StatusCode = None # type: ignore[assignment, misc] + +if TYPE_CHECKING: + from pymongo.typings import _Address, _AgnosticMongoClient, _DocumentOut + + +class _CommandTelemetry: + """Manages telemetry for MongoDB commands, including logging, event publishing, and OpenTelemetry spans. + + This class is a context manager that handles the full lifecycle of command telemetry: + - On entry (__enter__): Sets up OpenTelemetry span and publishes the started event + - On exit (__exit__): Cleans up the span context (caller handles success/failure publishing) + """ + + __slots__ = ( + "_command_name", + "_database_name", + "_spec", + "_driver_connection_id", + "_server_connection_id", + "_publish_event", + "_start_time", + "_address", + "_listeners", + "_client", + "_request_id", + "_service_id", + "_span", + "_span_context", + ) + + def __init__( + self, + command_name: str, + database_name: str, + spec: Mapping[str, Any], + driver_connection_id: int, + server_connection_id: Optional[int], + publish_event: bool, + start_time: datetime, + address: Optional[_Address], + listeners: Optional[_EventListeners], + client: Optional[_AgnosticMongoClient], + request_id: Optional[int], + service_id: Optional[Any], + ): + self._command_name = command_name + self._database_name = database_name + self._spec = spec + self._driver_connection_id = driver_connection_id + self._server_connection_id = server_connection_id + self._publish_event = publish_event + self._start_time = start_time + self._address = address + self._listeners = listeners + self._client = client + self._request_id = request_id + self._service_id = service_id + self._span: Optional[Span] = None + self._span_context: Optional[Any] = None + + def __enter__(self) -> _CommandTelemetry: + """Enter the telemetry context: set up span and publish started event.""" + self._setup_span() + self.publish_started() + return self + + def __exit__( + self, + exc_type: Optional[type], + exc_val: Optional[BaseException], + exc_tb: Optional[Any], + ) -> None: + """Exit the telemetry context: clean up span context.""" + if self._span_context is not None: + self._span_context.__exit__(exc_type, exc_val, exc_tb) + + def _setup_span(self) -> None: + """Set up OpenTelemetry span if tracing is enabled and command is not sensitive.""" + tracer = _get_tracer() + + if tracer is None or _is_sensitive_command(self._command_name): + return + + collection_name = _extract_collection_name(self._spec) + query_summary = _build_query_summary( + self._command_name, self._database_name, collection_name + ) + + self._span_context = tracer.start_as_current_span( + name=self._command_name, + kind=SpanKind.CLIENT, + ) + self._span = self._span_context.__enter__() + + # Set span attributes + self._span.set_attribute("db.system", "mongodb") + self._span.set_attribute("db.namespace", self._database_name) + self._span.set_attribute("db.command.name", self._command_name) + self._span.set_attribute("db.query.summary", query_summary) + if self._address: + self._span.set_attribute("server.address", self._address[0]) + self._span.set_attribute("server.port", self._address[1]) + self._span.set_attribute("network.transport", "tcp") + self._span.set_attribute("db.mongodb.driver_connection_id", self._driver_connection_id) + + if collection_name: + self._span.set_attribute("db.collection.name", collection_name) + if self._server_connection_id is not None: + self._span.set_attribute("db.mongodb.server_connection_id", self._server_connection_id) + + @property + def span(self) -> Optional[Span]: + """Return the OpenTelemetry span, or None if tracing is disabled.""" + return self._span + + def publish_started(self) -> None: + """Publish command started event and log.""" + if self._client is not None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.STARTED, + clientId=self._client._topology_settings._topology_id, + command=self._spec, + commandName=next(iter(self._spec)), + databaseName=self._database_name, + requestId=self._request_id, + operationId=self._request_id, + driverConnectionId=self._driver_connection_id, + serverConnectionId=self._server_connection_id, + serverHost=self._address[0] if self._address else None, + serverPort=self._address[1] if self._address else None, + serviceId=self._service_id, + ) + if self._publish_event: + assert self._listeners is not None + assert self._address is not None + self._listeners.publish_command_start( + self._spec, + self._database_name, + self._request_id, + self._address, + self._server_connection_id, + service_id=self._service_id, + ) + + def publish_succeeded( + self, + reply: _DocumentOut, + speculative_hello: bool = False, + speculative_authenticate: bool = False, + ) -> None: + """Publish command succeeded event and log.""" + duration = datetime.now() - self._start_time + if self._client is not None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.SUCCEEDED, + clientId=self._client._topology_settings._topology_id, + durationMS=duration, + reply=reply, + commandName=next(iter(self._spec)), + databaseName=self._database_name, + requestId=self._request_id, + operationId=self._request_id, + driverConnectionId=self._driver_connection_id, + serverConnectionId=self._server_connection_id, + serverHost=self._address[0] if self._address else None, + serverPort=self._address[1] if self._address else None, + serviceId=self._service_id, + speculative_authenticate=speculative_authenticate, + ) + if self._publish_event: + assert self._listeners is not None + assert self._address is not None + self._listeners.publish_command_success( + duration, + reply, + self._command_name, + self._request_id, + self._address, + self._server_connection_id, + service_id=self._service_id, + speculative_hello=speculative_hello, + database_name=self._database_name, + ) + + def publish_failed(self, exc: Exception) -> None: + """Publish command failed event and log.""" + duration = datetime.now() - self._start_time + if isinstance(exc, (NotPrimaryError, OperationFailure)): + failure: _DocumentOut = exc.details # type: ignore[assignment] + else: + failure = message._convert_exception(exc) + + if self._span is not None: + error_code = getattr(exc, "code", None) + self._span.record_exception(exc) + self._span.set_status(Status(StatusCode.ERROR, str(exc))) + + if error_code is not None: + self._span.set_attribute("db.response.status_code", str(error_code)) + if self._client is not None: + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.FAILED, + clientId=self._client._topology_settings._topology_id, + durationMS=duration, + failure=failure, + commandName=next(iter(self._spec)), + databaseName=self._database_name, + requestId=self._request_id, + operationId=self._request_id, + driverConnectionId=self._driver_connection_id, + serverConnectionId=self._server_connection_id, + serverHost=self._address[0] if self._address else None, + serverPort=self._address[1] if self._address else None, + serviceId=self._service_id, + isServerSideError=isinstance(exc, OperationFailure), + ) + if self._publish_event: + assert self._listeners is not None + assert self._address is not None + self._listeners.publish_command_failure( + duration, + failure, + self._command_name, + self._request_id, + self._address, + self._server_connection_id, + service_id=self._service_id, + database_name=self._database_name, + ) + + +def command_telemetry( + command_name: str, + database_name: str, + spec: Mapping[str, Any], + driver_connection_id: int, + server_connection_id: Optional[int], + publish_event: bool, + start_time: datetime, + address: Optional[_Address] = None, + listeners: Optional[_EventListeners] = None, + client: Optional[_AgnosticMongoClient] = None, + request_id: Optional[int] = None, + service_id: Optional[Any] = None, +) -> _CommandTelemetry: + """Create a _CommandTelemetry context manager for command telemetry. + + Returns a _CommandTelemetry instance that should be used as a context manager. + The context manager automatically: + - Sets up OpenTelemetry span if tracing is enabled and command is not sensitive + - Publishes the started event on entry + - Cleans up the span context on exit + + The caller is responsible for calling publish_succeeded() on successful completion + and publish_failed() if an exception occurs. + + Example usage:: + + with command_telemetry(...) as telemetry: + try: + # execute command + result = execute_command() + except Exception as exc: + telemetry.publish_failed(exc) + raise + telemetry.publish_succeeded(result) + """ + return _CommandTelemetry( + command_name=command_name, + database_name=database_name, + spec=spec, + driver_connection_id=driver_connection_id, + server_connection_id=server_connection_id, + publish_event=publish_event, + start_time=start_time, + address=address, + listeners=listeners, + client=client, + request_id=request_id, + service_id=service_id, + ) diff --git a/pymongo/tracing.py b/pymongo/tracing.py new file mode 100644 index 0000000000..3c89a69ee9 --- /dev/null +++ b/pymongo/tracing.py @@ -0,0 +1,132 @@ +# Copyright 2026-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenTelemetry tracing support for PyMongo. + +This module provides optional OpenTelemetry tracing for MongoDB commands. +Tracing is disabled by default and requires the opentelemetry-api package. + +To enable tracing, set the environment variable: + OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED=true + +.. versionadded:: 4.x +""" +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any, Mapping, Optional + +from pymongo.logger import _SENSITIVE_COMMANDS + +try: + from opentelemetry import trace + from opentelemetry.trace import Span, SpanKind, Status, StatusCode + + _HAS_OPENTELEMETRY = True +except ImportError: + _HAS_OPENTELEMETRY = False + trace = None # type: ignore[assignment] + Span = None # type: ignore[assignment, misc] + SpanKind = None # type: ignore[assignment, misc] + Status = None # type: ignore[assignment, misc] + StatusCode = None # type: ignore[assignment, misc] + +if TYPE_CHECKING: + from opentelemetry.trace import Tracer + +# Environment variable names +_OTEL_ENABLED_ENV = "OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED" + + +def _is_tracing_enabled() -> bool: + """Check if tracing is enabled via environment variable.""" + if not _HAS_OPENTELEMETRY: + return False + value = os.environ.get(_OTEL_ENABLED_ENV, "").lower() + return value in ("1", "true") + + +def _get_tracer() -> Optional[Tracer]: + """Get the PyMongo tracer instance.""" + if not _HAS_OPENTELEMETRY or not _is_tracing_enabled(): + return None + from pymongo._version import __version__ + + return trace.get_tracer("PyMongo", __version__) + + +def _is_sensitive_command(command_name: str) -> bool: + """Check if a command is sensitive and should not be traced.""" + return command_name.lower() in _SENSITIVE_COMMANDS + + +def _build_query_summary( + command_name: str, + database_name: str, + collection_name: Optional[str], +) -> str: + """Build the db.query.summary attribute value.""" + if collection_name: + return f"{command_name} {database_name}.{collection_name}" + return f"{command_name} {database_name}" + + +def _extract_collection_name(spec: Mapping[str, Any]) -> Optional[str]: + """Extract collection name from command spec if applicable.""" + if not spec: + return None + cmd_name = next(iter(spec)).lower() + # Commands where the first value is the collection name + if cmd_name in ( + "insert", + "update", + "delete", + "find", + "aggregate", + "findandmodify", + "count", + "distinct", + "create", + "drop", + "createindexes", + "dropindexes", + "listindexes", + ): + value = spec.get(next(iter(spec))) + if isinstance(value, str): + return value + return None + + +def record_command_exception( + span: Optional[Span], + exception: BaseException, + error_code: Optional[int] = None, +) -> None: + """Record an exception on a command span.""" + if span is None or not _HAS_OPENTELEMETRY: + return + + span.record_exception(exception) + span.set_status(Status(StatusCode.ERROR, str(exception))) + + if error_code is not None: + span.set_attribute("db.response.status_code", str(error_code)) + + +def add_cursor_id(span: Optional[Span], cursor_id: int) -> None: + """Add cursor ID attribute to span if present.""" + if span is None or cursor_id == 0: + return + span.set_attribute("db.mongodb.cursor_id", cursor_id) diff --git a/pyproject.toml b/pyproject.toml index 9b3287834a..fab9719b15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ docs = ["requirements/docs.txt"] encryption = ["requirements/encryption.txt"] gssapi = ["requirements/gssapi.txt"] ocsp = ["requirements/ocsp.txt"] +opentelemetry = ["requirements/opentelemetry.txt"] snappy = ["requirements/snappy.txt"] test = ["requirements/test.txt"] zstd = ["requirements/zstd.txt"] diff --git a/uv.lock b/uv.lock index 78d0cc213f..c8eb154730 100644 --- a/uv.lock +++ b/uv.lock @@ -1424,6 +1424,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d2/1d/1b658dbd2b9fa9c4c9f32accbfc0205d532c8c6194dc0f2a4c0428e7128a/nodeenv-1.9.1-py2.py3-none-any.whl", hash = "sha256:ba11c9782d29c27c70ffbdda2d7415098754709be8a7056d79a737cd901155c9", size = 22314, upload-time = "2024-06-04T18:44:08.352Z" }, ] +[[package]] +name = "opentelemetry-api" +version = "1.39.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "importlib-metadata" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/97/b9/3161be15bb8e3ad01be8be5a968a9237c3027c5be504362ff800fca3e442/opentelemetry_api-1.39.1.tar.gz", hash = "sha256:fbde8c80e1b937a2c61f20347e91c0c18a1940cecf012d62e65a7caf08967c9c", size = 65767, upload-time = "2025-12-11T13:32:39.182Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" }, +] + [[package]] name = "packaging" version = "25.0" @@ -1545,6 +1558,9 @@ ocsp = [ { name = "requests" }, { name = "service-identity" }, ] +opentelemetry = [ + { name = "opentelemetry-api" }, +] snappy = [ { name = "python-snappy" }, ] @@ -1591,6 +1607,7 @@ requires-dist = [ { name = "dnspython", specifier = ">=2.6.1,<3.0.0" }, { name = "furo", marker = "extra == 'docs'", specifier = "==2025.12.19" }, { name = "importlib-metadata", marker = "python_full_version < '3.13' and extra == 'test'", specifier = ">=7.0" }, + { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.20.0" }, { name = "pykerberos", marker = "os_name != 'nt' and extra == 'gssapi'", specifier = ">=1.2.4" }, { name = "pymongo-auth-aws", marker = "extra == 'aws'", specifier = ">=1.1.0,<2.0.0" }, { name = "pymongo-auth-aws", marker = "extra == 'encryption'", specifier = ">=1.1.0,<2.0.0" }, @@ -1608,7 +1625,7 @@ requires-dist = [ { name = "sphinxcontrib-shellcheck", marker = "extra == 'docs'", specifier = ">=1,<2" }, { name = "winkerberos", marker = "os_name == 'nt' and extra == 'gssapi'", specifier = ">=0.5.0" }, ] -provides-extras = ["aws", "docs", "encryption", "gssapi", "ocsp", "snappy", "test", "zstd"] +provides-extras = ["aws", "docs", "encryption", "gssapi", "ocsp", "opentelemetry", "snappy", "test", "zstd"] [package.metadata.requires-dev] coverage = [{ name = "coverage", extras = ["toml"], specifier = ">=5,<=7.10.7" }] From cf8f23440ad0e3da6096780ee82bda749b7847d9 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 4 Mar 2026 12:45:25 -0800 Subject: [PATCH 2/6] Use command_telemetry --- pymongo/asynchronous/bulk.py | 204 +++++++------------------- pymongo/asynchronous/client_bulk.py | 215 ++++++++-------------------- pymongo/asynchronous/network.py | 10 -- pymongo/asynchronous/server.py | 194 ++++++++----------------- pymongo/synchronous/bulk.py | 204 +++++++------------------- pymongo/synchronous/client_bulk.py | 215 ++++++++-------------------- pymongo/synchronous/network.py | 10 -- pymongo/synchronous/server.py | 194 ++++++++----------------- pymongo/telemetry.py | 133 +++++++++++++---- pymongo/tracing.py | 132 ----------------- 10 files changed, 450 insertions(+), 1061 deletions(-) delete mode 100644 pymongo/tracing.py diff --git a/pymongo/asynchronous/bulk.py b/pymongo/asynchronous/bulk.py index 4a54f9eb3f..029ac2fa82 100644 --- a/pymongo/asynchronous/bulk.py +++ b/pymongo/asynchronous/bulk.py @@ -19,8 +19,6 @@ from __future__ import annotations import copy -import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -57,18 +55,17 @@ OperationFailure, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _DELETE, _INSERT, _UPDATE, _BulkWriteContext, - _convert_exception, _convert_write_result, _EncryptedBulkWriteContext, _randint, ) from pymongo.read_preferences import ReadPreference +from pymongo.telemetry import command_telemetry from pymongo.write_concern import WriteConcern if TYPE_CHECKING: @@ -252,78 +249,31 @@ async def write_command( ) -> dict[str, Any]: """A proxy for SocketInfo.write_command that handles event publishing.""" cmd[bwc.field] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._start(cmd, request_id, docs) - try: - reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] - duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] - await client._process_response(reply, bwc.session) # type: ignore[arg-type] - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - - if bwc.publish: - bwc._fail(request_id, failure, duration) - # Process the response from the server. - if isinstance(exc, (NotPrimaryError, OperationFailure)): - await client._process_response(exc.details, bwc.session) # type: ignore[arg-type] - raise + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] + telemetry.publish_succeeded(reply) + await client._process_response(reply, bwc.session) # type: ignore[arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Process the response from the server. + if isinstance(exc, (NotPrimaryError, OperationFailure)): + await client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + raise return reply # type: ignore[return-value] async def unack_write( @@ -337,81 +287,33 @@ async def unack_write( client: AsyncMongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for AsyncConnection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - cmd = bwc._start(cmd, request_id, docs) - try: - result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override] - duration = datetime.datetime.now() - bwc.start_time - if result is not None: - reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] - else: - # Comply with APM spec. - reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, OperationFailure): - failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type] - elif isinstance(exc, NotPrimaryError): - failure = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if bwc.publish: - assert bwc.start_time is not None - bwc._fail(request_id, failure, duration) - raise + cmd[bwc.field] = docs + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override] + if result is not None: + reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] + else: + # Comply with APM spec. + reply = {"ok": 1} + telemetry.publish_succeeded(reply) + except Exception as exc: + telemetry.publish_failed(exc) + raise return result # type: ignore[return-value] async def _execute_batch_unack( diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index 151942c8a8..76a21cfac9 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -19,8 +19,6 @@ from __future__ import annotations import copy -import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -62,11 +60,9 @@ WaitQueueTimeoutError, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _ClientBulkWriteContext, _convert_client_bulk_exception, - _convert_exception, _convert_write_result, _randint, ) @@ -77,6 +73,7 @@ InsertOneResult, UpdateResult, ) +from pymongo.telemetry import command_telemetry from pymongo.typings import _DocumentOut, _Pipeline from pymongo.write_concern import WriteConcern @@ -238,82 +235,35 @@ async def write_command( """A proxy for AsyncConnection.write_command that handles event publishing.""" cmd["ops"] = op_docs cmd["nsInfo"] = ns_docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._start(cmd, request_id, op_docs, ns_docs) - try: - reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] - duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] - # Process the response from the server. - await self.client._process_response(reply, bwc.session) # type: ignore[arg-type] - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - - if bwc.publish: - bwc._fail(request_id, failure, duration) - # Top-level error will be embedded in ClientBulkWriteException. - reply = {"error": exc} - # Process the response from the server. - if isinstance(exc, OperationFailure): - await self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] - else: - await self.client._process_response({}, bwc.session) # type: ignore[arg-type] + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] + telemetry.publish_succeeded(reply) + # Process the response from the server. + await self.client._process_response(reply, bwc.session) # type: ignore[arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Top-level error will be embedded in ClientBulkWriteException. + reply = {"error": exc} + # Process the response from the server. + if isinstance(exc, OperationFailure): + await self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + else: + await self.client._process_response({}, bwc.session) # type: ignore[arg-type] return reply # type: ignore[return-value] async def unack_write( @@ -327,82 +277,35 @@ async def unack_write( client: AsyncMongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for AsyncConnection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - cmd = bwc._start(cmd, request_id, op_docs, ns_docs) - try: - result = await bwc.conn.unack_write(msg, bwc.max_bson_size) # type: ignore[func-returns-value, misc, override] - duration = datetime.datetime.now() - bwc.start_time - if result is not None: - reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] - else: - # Comply with APM spec. - reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, OperationFailure): - failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type] - elif isinstance(exc, NotPrimaryError): - failure = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if bwc.publish: - assert bwc.start_time is not None - bwc._fail(request_id, failure, duration) - # Top-level error will be embedded in ClientBulkWriteException. - reply = {"error": exc} + cmd["ops"] = op_docs + cmd["nsInfo"] = ns_docs + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + result = await bwc.conn.unack_write(msg, bwc.max_bson_size) # type: ignore[func-returns-value, misc, override] + if result is not None: + reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] + else: + # Comply with APM spec. + reply = {"ok": 1} + telemetry.publish_succeeded(reply) + except Exception as exc: + telemetry.publish_failed(exc) + # Top-level error will be embedded in ClientBulkWriteException. + reply = {"error": exc} return reply async def _execute_batch_unack( diff --git a/pymongo/asynchronous/network.py b/pymongo/asynchronous/network.py index b584dc8adb..d5aa98f3cb 100644 --- a/pymongo/asynchronous/network.py +++ b/pymongo/asynchronous/network.py @@ -37,7 +37,6 @@ async_sendall, ) from pymongo.telemetry import command_telemetry -from pymongo.tracing import add_cursor_id if TYPE_CHECKING: from bson import CodecOptions @@ -201,15 +200,6 @@ async def command( telemetry.publish_failed(exc) raise - # Add cursor_id to span if present in response - if telemetry.span is not None and isinstance(response_doc, dict): - cursor_info = response_doc.get("cursor") - if cursor_info and isinstance(cursor_info, dict): - cursor_id = cursor_info.get("id", 0) - if cursor_id: - add_cursor_id(telemetry.span, cursor_id) - - # Publish command succeeded event telemetry.publish_succeeded( reply=response_doc, speculative_hello=speculative_hello, diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index f212306174..2fc0c13458 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -28,17 +28,15 @@ from bson import _decode_all_selective from pymongo.asynchronous.helpers import _handle_reauth -from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response from pymongo.logger import ( - _COMMAND_LOGGER, _SDAM_LOGGER, - _CommandStatusMessage, _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query +from pymongo.message import _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response +from pymongo.telemetry import command_telemetry if TYPE_CHECKING: from queue import Queue @@ -170,140 +168,66 @@ async def run_operation( message = operation.get_message(read_preference, conn, use_cmd) request_id, data, max_doc_size = self._split_message(message) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - - if publish: - if "$db" not in cmd: - cmd["$db"] = dbn - assert listeners is not None - listeners.publish_command_start( - cmd, - dbn, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - ) - - try: - if more_to_come: - reply = await conn.receive_message(None) - else: - await conn.send_message(data, max_doc_size) - reply = await conn.receive_message(request_id) - - # Unpack and check for command errors. - if use_cmd: - user_fields = _CURSOR_DOC_FIELDS - legacy_response = False - else: - user_fields = None - legacy_response = True - docs = unpack_res( - reply, - operation.cursor_id, - operation.codec_options, - legacy_response=legacy_response, - user_fields=user_fields, - ) + if publish and "$db" not in cmd: + cmd["$db"] = dbn + + with command_telemetry( + command_name=operation.name, + database_name=dbn, + spec=cmd, + driver_connection_id=conn.id, + server_connection_id=conn.server_connection_id, + publish_event=publish, + start_time=start, + address=conn.address, + listeners=listeners, + client=client, + request_id=request_id, + service_id=conn.service_id, + ) as telemetry: + try: + if more_to_come: + reply = await conn.receive_message(None) + else: + await conn.send_message(data, max_doc_size) + reply = await conn.receive_message(request_id) + + # Unpack and check for command errors. + if use_cmd: + user_fields = _CURSOR_DOC_FIELDS + legacy_response = False + else: + user_fields = None + legacy_response = True + docs = unpack_res( + reply, + operation.cursor_id, + operation.codec_options, + legacy_response=legacy_response, + user_fields=user_fields, + ) + if use_cmd: + first = docs[0] + await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] + _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] + except Exception as exc: + telemetry.publish_failed(exc) + raise + + # Must publish in find / getMore / explain command response format. if use_cmd: - first = docs[0] - await operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] - except Exception as exc: - duration = datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] + res = docs[0] + elif operation.name == "explain": + res = docs[0] if docs else {} else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if publish: - assert listeners is not None - listeners.publish_command_failure( - duration, - failure, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) - raise + res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] + if operation.name == "find": + res["cursor"]["firstBatch"] = docs + else: + res["cursor"]["nextBatch"] = docs + telemetry.publish_succeeded(res) + duration = datetime.now() - start - # Must publish in find / getMore / explain command response - # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=res, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - listeners.publish_command_success( - duration, - res, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) # Decrypt response. client = operation.client # type: ignore[assignment] diff --git a/pymongo/synchronous/bulk.py b/pymongo/synchronous/bulk.py index 22d6a7a76a..5fef80ec82 100644 --- a/pymongo/synchronous/bulk.py +++ b/pymongo/synchronous/bulk.py @@ -19,8 +19,6 @@ from __future__ import annotations import copy -import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -55,13 +53,11 @@ OperationFailure, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _DELETE, _INSERT, _UPDATE, _BulkWriteContext, - _convert_exception, _convert_write_result, _EncryptedBulkWriteContext, _randint, @@ -69,6 +65,7 @@ from pymongo.read_preferences import ReadPreference from pymongo.synchronous.client_session import ClientSession, _validate_session_write_concern from pymongo.synchronous.helpers import _handle_reauth +from pymongo.telemetry import command_telemetry from pymongo.write_concern import WriteConcern if TYPE_CHECKING: @@ -252,78 +249,31 @@ def write_command( ) -> dict[str, Any]: """A proxy for SocketInfo.write_command that handles event publishing.""" cmd[bwc.field] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._start(cmd, request_id, docs) - try: - reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] - duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] - client._process_response(reply, bwc.session) # type: ignore[arg-type] - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - - if bwc.publish: - bwc._fail(request_id, failure, duration) - # Process the response from the server. - if isinstance(exc, (NotPrimaryError, OperationFailure)): - client._process_response(exc.details, bwc.session) # type: ignore[arg-type] - raise + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] + telemetry.publish_succeeded(reply) + client._process_response(reply, bwc.session) # type: ignore[arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Process the response from the server. + if isinstance(exc, (NotPrimaryError, OperationFailure)): + client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + raise return reply # type: ignore[return-value] def unack_write( @@ -337,81 +287,33 @@ def unack_write( client: MongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for Connection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - cmd = bwc._start(cmd, request_id, docs) - try: - result = bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override] - duration = datetime.datetime.now() - bwc.start_time - if result is not None: - reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] - else: - # Comply with APM spec. - reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, OperationFailure): - failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type] - elif isinstance(exc, NotPrimaryError): - failure = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if bwc.publish: - assert bwc.start_time is not None - bwc._fail(request_id, failure, duration) - raise + cmd[bwc.field] = docs + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + result = bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override] + if result is not None: + reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] + else: + # Comply with APM spec. + reply = {"ok": 1} + telemetry.publish_succeeded(reply) + except Exception as exc: + telemetry.publish_failed(exc) + raise return result # type: ignore[return-value] def _execute_batch_unack( diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index a606d028e1..f97bae021e 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -19,8 +19,6 @@ from __future__ import annotations import copy -import datetime -import logging from collections.abc import MutableMapping from itertools import islice from typing import ( @@ -62,11 +60,9 @@ WaitQueueTimeoutError, ) from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import ( _ClientBulkWriteContext, _convert_client_bulk_exception, - _convert_exception, _convert_write_result, _randint, ) @@ -77,6 +73,7 @@ InsertOneResult, UpdateResult, ) +from pymongo.telemetry import command_telemetry from pymongo.typings import _DocumentOut, _Pipeline from pymongo.write_concern import WriteConcern @@ -238,82 +235,35 @@ def write_command( """A proxy for Connection.write_command that handles event publishing.""" cmd["ops"] = op_docs cmd["nsInfo"] = ns_docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._start(cmd, request_id, op_docs, ns_docs) - try: - reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] - duration = datetime.datetime.now() - bwc.start_time - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) # type: ignore[arg-type] - # Process the response from the server. - self.client._process_response(reply, bwc.session) # type: ignore[arg-type] - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - - if bwc.publish: - bwc._fail(request_id, failure, duration) - # Top-level error will be embedded in ClientBulkWriteException. - reply = {"error": exc} - # Process the response from the server. - if isinstance(exc, OperationFailure): - self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] - else: - self.client._process_response({}, bwc.session) # type: ignore[arg-type] + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] + telemetry.publish_succeeded(reply) + # Process the response from the server. + self.client._process_response(reply, bwc.session) # type: ignore[arg-type] + except Exception as exc: + telemetry.publish_failed(exc) + # Top-level error will be embedded in ClientBulkWriteException. + reply = {"error": exc} + # Process the response from the server. + if isinstance(exc, OperationFailure): + self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type] + else: + self.client._process_response({}, bwc.session) # type: ignore[arg-type] return reply # type: ignore[return-value] def unack_write( @@ -327,82 +277,35 @@ def unack_write( client: MongoClient[Any], ) -> Optional[Mapping[str, Any]]: """A proxy for Connection.unack_write that handles event publishing.""" - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - cmd = bwc._start(cmd, request_id, op_docs, ns_docs) - try: - result = bwc.conn.unack_write(msg, bwc.max_bson_size) # type: ignore[func-returns-value, misc, override] - duration = datetime.datetime.now() - bwc.start_time - if result is not None: - reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] - else: - # Comply with APM spec. - reply = {"ok": 1} - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=reply, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - ) - if bwc.publish: - bwc._succeed(request_id, reply, duration) - except Exception as exc: - duration = datetime.datetime.now() - bwc.start_time - if isinstance(exc, OperationFailure): - failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type] - elif isinstance(exc, NotPrimaryError): - failure = exc.details # type: ignore[assignment] - else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=bwc.db_name, - requestId=request_id, - operationId=request_id, - driverConnectionId=bwc.conn.id, - serverConnectionId=bwc.conn.server_connection_id, - serverHost=bwc.conn.address[0], - serverPort=bwc.conn.address[1], - serviceId=bwc.conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if bwc.publish: - assert bwc.start_time is not None - bwc._fail(request_id, failure, duration) - # Top-level error will be embedded in ClientBulkWriteException. - reply = {"error": exc} + cmd["ops"] = op_docs + cmd["nsInfo"] = ns_docs + with command_telemetry( + command_name=bwc.name, + database_name=bwc.db_name, + spec=cmd, + driver_connection_id=bwc.conn.id, + server_connection_id=bwc.conn.server_connection_id, + publish_event=bwc.publish, + start_time=bwc.start_time, + address=bwc.conn.address, + listeners=bwc.listeners, + client=client, + request_id=request_id, + service_id=bwc.conn.service_id, + operation_id=bwc.op_id, + ) as telemetry: + try: + result = bwc.conn.unack_write(msg, bwc.max_bson_size) # type: ignore[func-returns-value, misc, override] + if result is not None: + reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type] + else: + # Comply with APM spec. + reply = {"ok": 1} + telemetry.publish_succeeded(reply) + except Exception as exc: + telemetry.publish_failed(exc) + # Top-level error will be embedded in ClientBulkWriteException. + reply = {"error": exc} return reply def _execute_batch_unack( diff --git a/pymongo/synchronous/network.py b/pymongo/synchronous/network.py index 497535cb43..ccde5c7c80 100644 --- a/pymongo/synchronous/network.py +++ b/pymongo/synchronous/network.py @@ -37,7 +37,6 @@ sendall, ) from pymongo.telemetry import command_telemetry -from pymongo.tracing import add_cursor_id if TYPE_CHECKING: from bson import CodecOptions @@ -201,15 +200,6 @@ def command( telemetry.publish_failed(exc) raise - # Add cursor_id to span if present in response - if telemetry.span is not None and isinstance(response_doc, dict): - cursor_info = response_doc.get("cursor") - if cursor_info and isinstance(cursor_info, dict): - cursor_id = cursor_info.get("id", 0) - if cursor_id: - add_cursor_id(telemetry.span, cursor_id) - - # Publish command succeeded event telemetry.publish_succeeded( reply=response_doc, speculative_hello=speculative_hello, diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index f57420918b..6d0dcc612f 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -27,18 +27,16 @@ ) from bson import _decode_all_selective -from pymongo.errors import NotPrimaryError, OperationFailure from pymongo.helpers_shared import _check_command_response from pymongo.logger import ( - _COMMAND_LOGGER, _SDAM_LOGGER, - _CommandStatusMessage, _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _convert_exception, _GetMore, _OpMsg, _Query +from pymongo.message import _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response from pymongo.synchronous.helpers import _handle_reauth +from pymongo.telemetry import command_telemetry if TYPE_CHECKING: from queue import Queue @@ -170,140 +168,66 @@ def run_operation( message = operation.get_message(read_preference, conn, use_cmd) request_id, data, max_doc_size = self._split_message(message) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - - if publish: - if "$db" not in cmd: - cmd["$db"] = dbn - assert listeners is not None - listeners.publish_command_start( - cmd, - dbn, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - ) - - try: - if more_to_come: - reply = conn.receive_message(None) - else: - conn.send_message(data, max_doc_size) - reply = conn.receive_message(request_id) - - # Unpack and check for command errors. - if use_cmd: - user_fields = _CURSOR_DOC_FIELDS - legacy_response = False - else: - user_fields = None - legacy_response = True - docs = unpack_res( - reply, - operation.cursor_id, - operation.codec_options, - legacy_response=legacy_response, - user_fields=user_fields, - ) + if publish and "$db" not in cmd: + cmd["$db"] = dbn + + with command_telemetry( + command_name=operation.name, + database_name=dbn, + spec=cmd, + driver_connection_id=conn.id, + server_connection_id=conn.server_connection_id, + publish_event=publish, + start_time=start, + address=conn.address, + listeners=listeners, + client=client, + request_id=request_id, + service_id=conn.service_id, + ) as telemetry: + try: + if more_to_come: + reply = conn.receive_message(None) + else: + conn.send_message(data, max_doc_size) + reply = conn.receive_message(request_id) + + # Unpack and check for command errors. + if use_cmd: + user_fields = _CURSOR_DOC_FIELDS + legacy_response = False + else: + user_fields = None + legacy_response = True + docs = unpack_res( + reply, + operation.cursor_id, + operation.codec_options, + legacy_response=legacy_response, + user_fields=user_fields, + ) + if use_cmd: + first = docs[0] + operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] + _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] + except Exception as exc: + telemetry.publish_failed(exc) + raise + + # Must publish in find / getMore / explain command response format. if use_cmd: - first = docs[0] - operation.client._process_response(first, operation.session) # type: ignore[misc, arg-type] - _check_command_response(first, conn.max_wire_version, pool_opts=conn.opts) # type:ignore[has-type] - except Exception as exc: - duration = datetime.now() - start - if isinstance(exc, (NotPrimaryError, OperationFailure)): - failure: _DocumentOut = exc.details # type: ignore[assignment] + res = docs[0] + elif operation.name == "explain": + res = docs[0] if docs else {} else: - failure = _convert_exception(exc) - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if publish: - assert listeners is not None - listeners.publish_command_failure( - duration, - failure, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) - raise + res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] + if operation.name == "find": + res["cursor"]["firstBatch"] = docs + else: + res["cursor"]["nextBatch"] = docs + telemetry.publish_succeeded(res) + duration = datetime.now() - start - # Must publish in find / getMore / explain command response - # format. - if use_cmd: - res = docs[0] - elif operation.name == "explain": - res = docs[0] if docs else {} - else: - res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr] - if operation.name == "find": - res["cursor"]["firstBatch"] = docs - else: - res["cursor"]["nextBatch"] = docs - if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=res, - commandName=next(iter(cmd)), - databaseName=dbn, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - listeners.publish_command_success( - duration, - res, - operation.name, - request_id, - conn.address, - conn.server_connection_id, - service_id=conn.service_id, - database_name=dbn, - ) # Decrypt response. client = operation.client # type: ignore[assignment] diff --git a/pymongo/telemetry.py b/pymongo/telemetry.py index e909c08995..1c31c81662 100644 --- a/pymongo/telemetry.py +++ b/pymongo/telemetry.py @@ -14,7 +14,7 @@ """Unified telemetry support for PyMongo. -Supports telemetry using standardized logging, event publishing, and OpenTelemetry. +Supports telemetry through standardized logging, event publishing, and OpenTelemetry. To enable OpenTelemetry logging, set the environment variable: OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED=true @@ -24,43 +24,109 @@ from __future__ import annotations import logging +import os from datetime import datetime from typing import TYPE_CHECKING, Any, Mapping, Optional from pymongo import message from pymongo.errors import NotPrimaryError, OperationFailure -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log +from pymongo.logger import _COMMAND_LOGGER, _SENSITIVE_COMMANDS, _CommandStatusMessage, _debug_log from pymongo.monitoring import _EventListeners -from pymongo.tracing import ( - _build_query_summary, - _extract_collection_name, - _get_tracer, - _is_sensitive_command, -) try: - from opentelemetry import trace - from opentelemetry.trace import Span, SpanKind, Status, StatusCode + from opentelemetry import trace # type: ignore[import-not-found] + from opentelemetry.trace import ( # type: ignore[import-not-found] + Span, + SpanKind, + Status, + StatusCode, + Tracer, + ) _HAS_OPENTELEMETRY = True except ImportError: _HAS_OPENTELEMETRY = False - trace = None # type: ignore[assignment] - Span = None # type: ignore[assignment, misc] - SpanKind = None # type: ignore[assignment, misc] - Status = None # type: ignore[assignment, misc] - StatusCode = None # type: ignore[assignment, misc] + trace = None + Span = None + SpanKind = None + Status = None + StatusCode = None + Tracer = None if TYPE_CHECKING: from pymongo.typings import _Address, _AgnosticMongoClient, _DocumentOut +# Environment variable names +_OTEL_ENABLED_ENV = "OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED" + + +def _is_tracing_enabled() -> bool: + """Check if tracing is enabled via environment variable.""" + if not _HAS_OPENTELEMETRY: + return False + value = os.environ.get(_OTEL_ENABLED_ENV, "").lower() + return value in ("1", "true") + + +def _get_tracer() -> Optional[Tracer]: + """Get the PyMongo tracer instance.""" + if not _HAS_OPENTELEMETRY or not _is_tracing_enabled(): + return None + from pymongo._version import __version__ + + return trace.get_tracer("PyMongo", __version__) + + +def _is_sensitive_command(command_name: str) -> bool: + """Check if a command is sensitive and should not be traced.""" + return command_name.lower() in _SENSITIVE_COMMANDS + + +def _build_query_summary( + command_name: str, + database_name: str, + collection_name: Optional[str], +) -> str: + """Build the db.query.summary attribute value.""" + if collection_name: + return f"{command_name} {database_name}.{collection_name}" + return f"{command_name} {database_name}" + + +def _extract_collection_name(spec: Mapping[str, Any]) -> Optional[str]: + """Extract collection name from command spec if applicable.""" + if not spec: + return None + cmd_name = next(iter(spec)).lower() + # Commands where the first value is the collection name + if cmd_name in ( + "insert", + "update", + "delete", + "find", + "aggregate", + "findandmodify", + "count", + "distinct", + "create", + "drop", + "createindexes", + "dropindexes", + "listindexes", + ): + value = spec.get(next(iter(spec))) + if isinstance(value, str): + return value + return None + + class _CommandTelemetry: """Manages telemetry for MongoDB commands, including logging, event publishing, and OpenTelemetry spans. This class is a context manager that handles the full lifecycle of command telemetry: - - On entry (__enter__): Sets up OpenTelemetry span and publishes the started event - - On exit (__exit__): Cleans up the span context (caller handles success/failure publishing) + - On entry: sets up OpenTelemetry span and publishes the started event + - On exit: cleans up the span context (caller handles success/failure publishing) """ __slots__ = ( @@ -75,6 +141,7 @@ class _CommandTelemetry: "_listeners", "_client", "_request_id", + "_operation_id", "_service_id", "_span", "_span_context", @@ -89,11 +156,12 @@ def __init__( server_connection_id: Optional[int], publish_event: bool, start_time: datetime, - address: Optional[_Address], + address: _Address, listeners: Optional[_EventListeners], client: Optional[_AgnosticMongoClient], - request_id: Optional[int], + request_id: int, service_id: Optional[Any], + operation_id: Optional[int] = None, ): self._command_name = command_name self._database_name = database_name @@ -106,6 +174,7 @@ def __init__( self._listeners = listeners self._client = client self._request_id = request_id + self._operation_id = operation_id if operation_id is not None else request_id self._service_id = service_id self._span: Optional[Span] = None self._span_context: Optional[Any] = None @@ -177,7 +246,7 @@ def publish_started(self) -> None: commandName=next(iter(self._spec)), databaseName=self._database_name, requestId=self._request_id, - operationId=self._request_id, + operationId=self._operation_id, driverConnectionId=self._driver_connection_id, serverConnectionId=self._server_connection_id, serverHost=self._address[0] if self._address else None, @@ -188,11 +257,12 @@ def publish_started(self) -> None: assert self._listeners is not None assert self._address is not None self._listeners.publish_command_start( - self._spec, + self._spec, # type: ignore[arg-type] self._database_name, self._request_id, self._address, self._server_connection_id, + op_id=self._operation_id, service_id=self._service_id, ) @@ -204,6 +274,15 @@ def publish_succeeded( ) -> None: """Publish command succeeded event and log.""" duration = datetime.now() - self._start_time + + # Add cursor_id to span if present in response + if self._span is not None and isinstance(reply, dict): + cursor_info = reply.get("cursor") + if cursor_info and isinstance(cursor_info, dict): + cursor_id = cursor_info.get("id", 0) + if cursor_id: + self._span.set_attribute("db.mongodb.cursor_id", cursor_id) + if self._client is not None: if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( @@ -215,7 +294,7 @@ def publish_succeeded( commandName=next(iter(self._spec)), databaseName=self._database_name, requestId=self._request_id, - operationId=self._request_id, + operationId=self._operation_id, driverConnectionId=self._driver_connection_id, serverConnectionId=self._server_connection_id, serverHost=self._address[0] if self._address else None, @@ -233,6 +312,7 @@ def publish_succeeded( self._request_id, self._address, self._server_connection_id, + op_id=self._operation_id, service_id=self._service_id, speculative_hello=speculative_hello, database_name=self._database_name, @@ -264,7 +344,7 @@ def publish_failed(self, exc: Exception) -> None: commandName=next(iter(self._spec)), databaseName=self._database_name, requestId=self._request_id, - operationId=self._request_id, + operationId=self._operation_id, driverConnectionId=self._driver_connection_id, serverConnectionId=self._server_connection_id, serverHost=self._address[0] if self._address else None, @@ -282,6 +362,7 @@ def publish_failed(self, exc: Exception) -> None: self._request_id, self._address, self._server_connection_id, + op_id=self._operation_id, service_id=self._service_id, database_name=self._database_name, ) @@ -295,11 +376,12 @@ def command_telemetry( server_connection_id: Optional[int], publish_event: bool, start_time: datetime, - address: Optional[_Address] = None, + request_id: int, + address: _Address, listeners: Optional[_EventListeners] = None, client: Optional[_AgnosticMongoClient] = None, - request_id: Optional[int] = None, service_id: Optional[Any] = None, + operation_id: Optional[int] = None, ) -> _CommandTelemetry: """Create a _CommandTelemetry context manager for command telemetry. @@ -336,4 +418,5 @@ def command_telemetry( client=client, request_id=request_id, service_id=service_id, + operation_id=operation_id, ) diff --git a/pymongo/tracing.py b/pymongo/tracing.py deleted file mode 100644 index 3c89a69ee9..0000000000 --- a/pymongo/tracing.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2026-present MongoDB, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""OpenTelemetry tracing support for PyMongo. - -This module provides optional OpenTelemetry tracing for MongoDB commands. -Tracing is disabled by default and requires the opentelemetry-api package. - -To enable tracing, set the environment variable: - OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED=true - -.. versionadded:: 4.x -""" -from __future__ import annotations - -import os -from typing import TYPE_CHECKING, Any, Mapping, Optional - -from pymongo.logger import _SENSITIVE_COMMANDS - -try: - from opentelemetry import trace - from opentelemetry.trace import Span, SpanKind, Status, StatusCode - - _HAS_OPENTELEMETRY = True -except ImportError: - _HAS_OPENTELEMETRY = False - trace = None # type: ignore[assignment] - Span = None # type: ignore[assignment, misc] - SpanKind = None # type: ignore[assignment, misc] - Status = None # type: ignore[assignment, misc] - StatusCode = None # type: ignore[assignment, misc] - -if TYPE_CHECKING: - from opentelemetry.trace import Tracer - -# Environment variable names -_OTEL_ENABLED_ENV = "OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED" - - -def _is_tracing_enabled() -> bool: - """Check if tracing is enabled via environment variable.""" - if not _HAS_OPENTELEMETRY: - return False - value = os.environ.get(_OTEL_ENABLED_ENV, "").lower() - return value in ("1", "true") - - -def _get_tracer() -> Optional[Tracer]: - """Get the PyMongo tracer instance.""" - if not _HAS_OPENTELEMETRY or not _is_tracing_enabled(): - return None - from pymongo._version import __version__ - - return trace.get_tracer("PyMongo", __version__) - - -def _is_sensitive_command(command_name: str) -> bool: - """Check if a command is sensitive and should not be traced.""" - return command_name.lower() in _SENSITIVE_COMMANDS - - -def _build_query_summary( - command_name: str, - database_name: str, - collection_name: Optional[str], -) -> str: - """Build the db.query.summary attribute value.""" - if collection_name: - return f"{command_name} {database_name}.{collection_name}" - return f"{command_name} {database_name}" - - -def _extract_collection_name(spec: Mapping[str, Any]) -> Optional[str]: - """Extract collection name from command spec if applicable.""" - if not spec: - return None - cmd_name = next(iter(spec)).lower() - # Commands where the first value is the collection name - if cmd_name in ( - "insert", - "update", - "delete", - "find", - "aggregate", - "findandmodify", - "count", - "distinct", - "create", - "drop", - "createindexes", - "dropindexes", - "listindexes", - ): - value = spec.get(next(iter(spec))) - if isinstance(value, str): - return value - return None - - -def record_command_exception( - span: Optional[Span], - exception: BaseException, - error_code: Optional[int] = None, -) -> None: - """Record an exception on a command span.""" - if span is None or not _HAS_OPENTELEMETRY: - return - - span.record_exception(exception) - span.set_status(Status(StatusCode.ERROR, str(exception))) - - if error_code is not None: - span.set_attribute("db.response.status_code", str(error_code)) - - -def add_cursor_id(span: Optional[Span], cursor_id: int) -> None: - """Add cursor ID attribute to span if present.""" - if span is None or cursor_id == 0: - return - span.set_attribute("db.mongodb.cursor_id", cursor_id) From ba530d3c517298b4f4439bfb481fdb7f76d558b4 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 4 Mar 2026 12:50:31 -0800 Subject: [PATCH 3/6] Typing --- pymongo/synchronous/bulk.py | 2 +- pymongo/synchronous/client_bulk.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pymongo/synchronous/bulk.py b/pymongo/synchronous/bulk.py index 5fef80ec82..69fb28e8b9 100644 --- a/pymongo/synchronous/bulk.py +++ b/pymongo/synchronous/bulk.py @@ -266,7 +266,7 @@ def write_command( ) as telemetry: try: reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] - telemetry.publish_succeeded(reply) + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] client._process_response(reply, bwc.session) # type: ignore[arg-type] except Exception as exc: telemetry.publish_failed(exc) diff --git a/pymongo/synchronous/client_bulk.py b/pymongo/synchronous/client_bulk.py index f97bae021e..090a1c1d70 100644 --- a/pymongo/synchronous/client_bulk.py +++ b/pymongo/synchronous/client_bulk.py @@ -252,7 +252,7 @@ def write_command( ) as telemetry: try: reply = bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] - telemetry.publish_succeeded(reply) + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] # Process the response from the server. self.client._process_response(reply, bwc.session) # type: ignore[arg-type] except Exception as exc: @@ -301,7 +301,7 @@ def unack_write( else: # Comply with APM spec. reply = {"ok": 1} - telemetry.publish_succeeded(reply) + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] except Exception as exc: telemetry.publish_failed(exc) # Top-level error will be embedded in ClientBulkWriteException. From abdfb274e44dd9f7eafb27279fd1d4d31a10114c Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 4 Mar 2026 12:54:44 -0800 Subject: [PATCH 4/6] Typing --- pymongo/asynchronous/bulk.py | 4 ++-- pymongo/asynchronous/client_bulk.py | 4 ++-- pymongo/synchronous/bulk.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pymongo/asynchronous/bulk.py b/pymongo/asynchronous/bulk.py index 029ac2fa82..1ff6475673 100644 --- a/pymongo/asynchronous/bulk.py +++ b/pymongo/asynchronous/bulk.py @@ -266,7 +266,7 @@ async def write_command( ) as telemetry: try: reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc] - telemetry.publish_succeeded(reply) + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] await client._process_response(reply, bwc.session) # type: ignore[arg-type] except Exception as exc: telemetry.publish_failed(exc) @@ -310,7 +310,7 @@ async def unack_write( else: # Comply with APM spec. reply = {"ok": 1} - telemetry.publish_succeeded(reply) + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] except Exception as exc: telemetry.publish_failed(exc) raise diff --git a/pymongo/asynchronous/client_bulk.py b/pymongo/asynchronous/client_bulk.py index 76a21cfac9..d978599c24 100644 --- a/pymongo/asynchronous/client_bulk.py +++ b/pymongo/asynchronous/client_bulk.py @@ -252,7 +252,7 @@ async def write_command( ) as telemetry: try: reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc, arg-type] - telemetry.publish_succeeded(reply) + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] # Process the response from the server. await self.client._process_response(reply, bwc.session) # type: ignore[arg-type] except Exception as exc: @@ -301,7 +301,7 @@ async def unack_write( else: # Comply with APM spec. reply = {"ok": 1} - telemetry.publish_succeeded(reply) + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] except Exception as exc: telemetry.publish_failed(exc) # Top-level error will be embedded in ClientBulkWriteException. diff --git a/pymongo/synchronous/bulk.py b/pymongo/synchronous/bulk.py index 69fb28e8b9..1fe6636646 100644 --- a/pymongo/synchronous/bulk.py +++ b/pymongo/synchronous/bulk.py @@ -310,7 +310,7 @@ def unack_write( else: # Comply with APM spec. reply = {"ok": 1} - telemetry.publish_succeeded(reply) + telemetry.publish_succeeded(reply) # type: ignore[misc, arg-type] except Exception as exc: telemetry.publish_failed(exc) raise From d6bc364c60b24b4e51548ffac28dcbcb1b50e8da Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 4 Mar 2026 13:16:58 -0800 Subject: [PATCH 5/6] Add requirements/opentelemetry.txt --- requirements/opentelemetry.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 requirements/opentelemetry.txt diff --git a/requirements/opentelemetry.txt b/requirements/opentelemetry.txt new file mode 100644 index 0000000000..d20388d07f --- /dev/null +++ b/requirements/opentelemetry.txt @@ -0,0 +1 @@ +opentelemetry-api>=1.20.0 From d1f3f4d519a619e980ba729b7010a8ae3a2fb740 Mon Sep 17 00:00:00 2001 From: Noah Stapp Date: Wed, 4 Mar 2026 13:37:25 -0800 Subject: [PATCH 6/6] Docstring cleanup --- pymongo/telemetry.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/pymongo/telemetry.py b/pymongo/telemetry.py index 1c31c81662..b1308d93c9 100644 --- a/pymongo/telemetry.py +++ b/pymongo/telemetry.py @@ -57,12 +57,10 @@ from pymongo.typings import _Address, _AgnosticMongoClient, _DocumentOut -# Environment variable names _OTEL_ENABLED_ENV = "OTEL_PYTHON_INSTRUMENTATION_MONGODB_ENABLED" def _is_tracing_enabled() -> bool: - """Check if tracing is enabled via environment variable.""" if not _HAS_OPENTELEMETRY: return False value = os.environ.get(_OTEL_ENABLED_ENV, "").lower() @@ -70,7 +68,6 @@ def _is_tracing_enabled() -> bool: def _get_tracer() -> Optional[Tracer]: - """Get the PyMongo tracer instance.""" if not _HAS_OPENTELEMETRY or not _is_tracing_enabled(): return None from pymongo._version import __version__ @@ -79,7 +76,6 @@ def _get_tracer() -> Optional[Tracer]: def _is_sensitive_command(command_name: str) -> bool: - """Check if a command is sensitive and should not be traced.""" return command_name.lower() in _SENSITIVE_COMMANDS @@ -125,7 +121,7 @@ class _CommandTelemetry: """Manages telemetry for MongoDB commands, including logging, event publishing, and OpenTelemetry spans. This class is a context manager that handles the full lifecycle of command telemetry: - - On entry: sets up OpenTelemetry span and publishes the started event + - On entry: sets up OpenTelemetry span (if enabled) and publishes the started event and/or log - On exit: cleans up the span context (caller handles success/failure publishing) """ @@ -180,7 +176,6 @@ def __init__( self._span_context: Optional[Any] = None def __enter__(self) -> _CommandTelemetry: - """Enter the telemetry context: set up span and publish started event.""" self._setup_span() self.publish_started() return self @@ -191,7 +186,6 @@ def __exit__( exc_val: Optional[BaseException], exc_tb: Optional[Any], ) -> None: - """Exit the telemetry context: clean up span context.""" if self._span_context is not None: self._span_context.__exit__(exc_type, exc_val, exc_tb) @@ -213,7 +207,6 @@ def _setup_span(self) -> None: ) self._span = self._span_context.__enter__() - # Set span attributes self._span.set_attribute("db.system", "mongodb") self._span.set_attribute("db.namespace", self._database_name) self._span.set_attribute("db.command.name", self._command_name) @@ -235,7 +228,7 @@ def span(self) -> Optional[Span]: return self._span def publish_started(self) -> None: - """Publish command started event and log.""" + """Publish command started event and log if enabled.""" if self._client is not None: if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): _debug_log( @@ -272,7 +265,7 @@ def publish_succeeded( speculative_hello: bool = False, speculative_authenticate: bool = False, ) -> None: - """Publish command succeeded event and log.""" + """Publish command succeeded event and log if enabled.""" duration = datetime.now() - self._start_time # Add cursor_id to span if present in response @@ -319,7 +312,7 @@ def publish_succeeded( ) def publish_failed(self, exc: Exception) -> None: - """Publish command failed event and log.""" + """Publish command failed event and log if enabled.""" duration = datetime.now() - self._start_time if isinstance(exc, (NotPrimaryError, OperationFailure)): failure: _DocumentOut = exc.details # type: ignore[assignment] @@ -388,7 +381,7 @@ def command_telemetry( Returns a _CommandTelemetry instance that should be used as a context manager. The context manager automatically: - Sets up OpenTelemetry span if tracing is enabled and command is not sensitive - - Publishes the started event on entry + - Publishes the started event and/or log on entry if enabled - Cleans up the span context on exit The caller is responsible for calling publish_succeeded() on successful completion