diff --git a/sqlserver/changelog.d/23533.fixed b/sqlserver/changelog.d/23533.fixed new file mode 100644 index 0000000000000..ad1f8feea2c95 --- /dev/null +++ b/sqlserver/changelog.d/23533.fixed @@ -0,0 +1 @@ +Fix Azure SQL Database and Azure SQL Managed Instance schema collection by using database compatibility level for schema query selection. \ No newline at end of file diff --git a/sqlserver/changelog.d/23544.fixed b/sqlserver/changelog.d/23544.fixed new file mode 100644 index 0000000000000..9ddf1f35c3510 --- /dev/null +++ b/sqlserver/changelog.d/23544.fixed @@ -0,0 +1 @@ +Reuse the auxiliary SQL Server schema collection connection for legacy table detail queries. \ No newline at end of file diff --git a/sqlserver/datadog_checks/sqlserver/queries.py b/sqlserver/datadog_checks/sqlserver/queries.py index ef4f311c62c36..f41137464cc3e 100644 --- a/sqlserver/datadog_checks/sqlserver/queries.py +++ b/sqlserver/datadog_checks/sqlserver/queries.py @@ -7,7 +7,8 @@ # `{}` is replaced with comma-separated ODBC `?` placeholders (values bound via parameters). DB_QUERY = """ SELECT - db.database_id AS id, db.name AS name, db.collation_name AS collation, dp.name AS owner + db.database_id AS id, db.name AS name, db.collation_name AS collation, dp.name AS owner, + db.compatibility_level AS compatibility_level FROM sys.databases db LEFT JOIN sys.database_principals dp ON db.owner_sid = dp.sid WHERE db.name IN ({}); diff --git a/sqlserver/datadog_checks/sqlserver/schemas.py b/sqlserver/datadog_checks/sqlserver/schemas.py index def952e15ef66..ed1c8bd24110e 100644 --- a/sqlserver/datadog_checks/sqlserver/schemas.py +++ b/sqlserver/datadog_checks/sqlserver/schemas.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, TypedDict from datadog_checks.base.utils.serialization import json -from datadog_checks.sqlserver.utils import construct_use_statement, execute_query +from datadog_checks.sqlserver.utils import construct_use_statement, execute_query, is_azure_database if TYPE_CHECKING: from datadog_checks.sqlserver import SQLServer @@ -16,6 +16,7 @@ from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig from datadog_checks.sqlserver.const import ( DEFAULT_SCHEMAS_COLLECTION_INTERVAL, + STATIC_INFO_ENGINE_EDITION, STATIC_INFO_MAJOR_VERSION, ) from datadog_checks.sqlserver.queries import ( @@ -32,6 +33,8 @@ KEY_PREFIX = "dbm-schemas-" KEY_PREFIX_PRE_2017 = "dbm-schemas-pre-2017" +# The modern schema query uses database-scoped JSON output, which requires SQL Server 2016 compatibility. +MINIMUM_JSON_COMPATIBILITY_LEVEL = 130 class DatabaseInfo(TypedDict): @@ -39,6 +42,7 @@ class DatabaseInfo(TypedDict): id: str collation: str owner: str + compatibility_level: str # The schema collector sends lists of DatabaseObjects to the agent @@ -81,22 +85,15 @@ def __init__(self, check: SQLServer): ) config.max_tables = check._config.schema_config.get('max_tables', 300) self._is_2016_or_earlier = None + self._database_compatibility_levels: dict[str, int] = {} + self._pre_2017_cursor = None super().__init__(check, config) - def collect_schemas(self): - # We wait until collect is called to check for static information - major_version = int(self._check.static_info_cache.get(STATIC_INFO_MAJOR_VERSION) or 0) - if major_version == 0: - self._check.log.debug("major_version is not available yet, defaulting to 2016 or earlier") - self._is_2016_or_earlier = major_version <= 13 - - super().collect_schemas() - @property def kind(self): return "sqlserver_databases" - def _get_databases(self): + def _get_databases(self) -> list[DatabaseInfo]: database_names = self._check.get_databases() with self._check.connection.open_managed_default_connection(KEY_PREFIX): with self._check.connection.get_managed_cursor(KEY_PREFIX) as cursor: @@ -104,17 +101,69 @@ def _get_databases(self): return [] placeholders = ",".join(["?"] * len(database_names)) query = DB_QUERY.format(placeholders) - return execute_query(query, cursor, convert_results_to_str=True, parameters=tuple(database_names)) + databases = execute_query(query, cursor, convert_results_to_str=True, parameters=tuple(database_names)) + self._record_database_compatibility_levels(databases) + return databases @contextlib.contextmanager def _get_cursor(self, database_name): - with self._check.connection.open_managed_default_connection(KEY_PREFIX): - with self._check.connection.get_managed_cursor(KEY_PREFIX) as cursor: + with contextlib.ExitStack() as stack: + try: + stack.enter_context(self._check.connection.open_managed_default_connection(KEY_PREFIX)) + cursor = stack.enter_context(self._check.connection.get_managed_cursor(KEY_PREFIX)) switch_db_statement = construct_use_statement(database_name) cursor.execute(switch_db_statement) + self._is_2016_or_earlier = self._should_use_legacy_schema_query(database_name) + + if self._is_2016_or_earlier: + stack.enter_context(self._check.connection.open_managed_default_connection(KEY_PREFIX_PRE_2017)) + self._pre_2017_cursor = stack.enter_context( + self._check.connection.get_managed_cursor(KEY_PREFIX_PRE_2017) + ) + self._pre_2017_cursor.execute(switch_db_statement) + query = self._get_tables_query() cursor.execute(query) yield cursor + finally: + self._pre_2017_cursor = None + + def _record_database_compatibility_levels(self, databases: list[DatabaseInfo]) -> None: + self._database_compatibility_levels = {} + for database in databases: + self._database_compatibility_levels[database["name"]] = int(database["compatibility_level"]) + + def _should_use_legacy_schema_query(self, database_name: str) -> bool: + """ + Return whether the current database needs the legacy schema query. + + The modern schema query depends on two SQL Server features: + - STRING_AGG, used to build index column lists. On self-managed SQL Server, this requires SQL Server 2017 + or later. Azure SQL Database and Azure SQL Managed Instance report ProductMajorVersion 12 while still + supporting STRING_AGG, so only self-managed SQL Server uses this version gate. + - JSON output, used for column, index, and foreign key metadata. This is controlled by each database's + compatibility_level and requires level 130 or higher. + + If ProductMajorVersion is missing, use the legacy query because we cannot confirm that STRING_AGG is + available. + """ + engine_edition = self._check.static_info_cache.get(STATIC_INFO_ENGINE_EDITION) + if not is_azure_database(engine_edition): + major_version = int(self._check.static_info_cache.get(STATIC_INFO_MAJOR_VERSION) or 0) + if major_version == 0: + self._check.log.debug("major_version is not available yet, using legacy schema query") + return True + if major_version <= 13: + return True + + compatibility_level = self._database_compatibility_levels.get(database_name) + if compatibility_level is None: + self._check.log.debug( + "compatibility_level is not available for SQL Server database %s, using pre-2017 schema query", + database_name, + ) + return True + return compatibility_level < MINIMUM_JSON_COMPATIBILITY_LEVEL def _get_tables_query(self): limit = int(self._config.max_tables or 1_000_000) @@ -169,25 +218,25 @@ def _map_row(self, database: DatabaseInfo, cursor_row) -> DatabaseObject: object = super()._map_row(database, cursor_row) if self._is_2016_or_earlier: # We need to fetch the related data for each table - # Use a key_prefix to get a separate connection to avoid conflicts with the main connection - with self._check.connection.open_managed_default_connection(KEY_PREFIX_PRE_2017): - with self._check.connection.get_managed_cursor(KEY_PREFIX_PRE_2017) as cursor: - switch_db_statement = construct_use_statement(database.get("name")) - cursor.execute(switch_db_statement) - table_id = str(cursor_row.get("table_id")) - columns_query = COLUMN_QUERY.replace("schema_tables.table_id", table_id) - cursor.execute(columns_query) - columns = cursor.fetchall_dict() - indexes_query = INDEX_QUERY_PRE_2017.replace("schema_tables.table_id", table_id) - cursor.execute(indexes_query) - indexes = cursor.fetchall_dict() - foreign_keys_query = FOREIGN_KEY_QUERY_PRE_2017.replace("schema_tables.table_id", table_id) - cursor.execute(foreign_keys_query) - foreign_keys = cursor.fetchall_dict() - partitions_query = PARTITIONS_QUERY.replace("schema_tables.table_id", table_id) - cursor.execute(partitions_query) - partition_row = cursor.fetchone_dict() - partition_count = partition_row.get("partition_count") if partition_row else None + # Use a separate connection to avoid conflicts with the main cursor while it streams table rows. + cursor = self._pre_2017_cursor + if cursor is None: + raise RuntimeError("pre-2017 schema cursor is not initialized") + + table_id = str(cursor_row.get("table_id")) + columns_query = COLUMN_QUERY.replace("schema_tables.table_id", table_id) + cursor.execute(columns_query) + columns = cursor.fetchall_dict() + indexes_query = INDEX_QUERY_PRE_2017.replace("schema_tables.table_id", table_id) + cursor.execute(indexes_query) + indexes = cursor.fetchall_dict() + foreign_keys_query = FOREIGN_KEY_QUERY_PRE_2017.replace("schema_tables.table_id", table_id) + cursor.execute(foreign_keys_query) + foreign_keys = cursor.fetchall_dict() + partitions_query = PARTITIONS_QUERY.replace("schema_tables.table_id", table_id) + cursor.execute(partitions_query) + partition_row = cursor.fetchone_dict() + partition_count = partition_row.get("partition_count") if partition_row else None else: columns = json.loads(cursor_row.get("columns") or "[]") indexes = json.loads(cursor_row.get("indexes") or "[]") diff --git a/sqlserver/tests/test_metadata.py b/sqlserver/tests/test_metadata.py index d979e2cf0b886..e71a31a804bf3 100644 --- a/sqlserver/tests/test_metadata.py +++ b/sqlserver/tests/test_metadata.py @@ -21,6 +21,11 @@ pyodbc = None +def normalize_compatibility_level(actual_payload): + assert actual_payload['compatibility_level'].isdigit() + actual_payload['compatibility_level'] = 'normalized_value' + + @pytest.fixture def dbm_instance(instance_docker): instance_docker['dbm'] = True @@ -101,6 +106,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): 'name': 'datadog_test_schemas_second', "collation": "SQL_Latin1_General_CP1_CI_AS", 'owner': 'dbo', + 'compatibility_level': 'normalized_value', 'schemas': [ { 'name': 'dbo', @@ -147,6 +153,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): 'name': 'datadog_test_schemas', "collation": "SQL_Latin1_General_CP1_CI_AS", 'owner': 'dbo', + 'compatibility_level': 'normalized_value', 'schemas': [ { 'name': 'test_schema', @@ -373,6 +380,8 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): assert db_name in databases_to_find # id's are env dependant normalize_ids(actual_payload) + # compatibility_level varies by SQL Server version + normalize_compatibility_level(actual_payload) # index columns may be in any order normalize_indexes_columns(actual_payload) matches = deep_compare(actual_payload, expected_data_for_db[db_name]) diff --git a/sqlserver/tests/test_schemas.py b/sqlserver/tests/test_schemas.py index f75559d27b21d..d0e3b9dd5db29 100644 --- a/sqlserver/tests/test_schemas.py +++ b/sqlserver/tests/test_schemas.py @@ -4,6 +4,7 @@ import json from typing import Callable, Optional +from unittest import mock import pytest @@ -15,9 +16,12 @@ pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] +SCHEMA_DATABASE = 'datadog_test_schemas' + @pytest.fixture def dbm_instance(instance_docker): + instance_docker['database'] = SCHEMA_DATABASE instance_docker['dbm'] = True instance_docker['min_collection_interval'] = 0.1 instance_docker['query_samples'] = {'enabled': False} @@ -47,11 +51,17 @@ def _check(instance: dict, init_config: dict = None): c.cancel() +def create_schema_collector(check: SQLServer) -> SQLServerSchemaCollector: + collector = SQLServerSchemaCollector(check) + collector._get_databases() + return collector + + def test_get_cursor(dbm_instance, integration_check): check = integration_check(dbm_instance) - collector = SQLServerSchemaCollector(check) + collector = create_schema_collector(check) - with collector._get_cursor('datadog_test_schemas') as cursor: + with collector._get_cursor(SCHEMA_DATABASE) as cursor: assert cursor is not None schemas = [] rows = cursor.fetchall_dict() @@ -65,9 +75,9 @@ def test_get_cursor(dbm_instance, integration_check): def test_tables(dbm_instance, integration_check): check = integration_check(dbm_instance) - collector = SQLServerSchemaCollector(check) + collector = create_schema_collector(check) - with collector._get_cursor('datadog_test_schemas') as cursor: + with collector._get_cursor(SCHEMA_DATABASE) as cursor: assert cursor is not None tables = [] rows = cursor.fetchall_dict() @@ -80,13 +90,12 @@ def test_tables(dbm_instance, integration_check): def test_columns(dbm_instance, integration_check): check = integration_check(dbm_instance) - collector = SQLServerSchemaCollector(check) + collector = create_schema_collector(check) - with collector._get_cursor('datadog_test_schemas') as cursor: + with collector._get_cursor(SCHEMA_DATABASE) as cursor: assert cursor is not None # Assert that at least one row has columns rows = cursor.fetchall_dict() - print(rows) assert any(row['columns'] for row in rows) for row in rows: if row['columns']: @@ -101,9 +110,9 @@ def test_columns(dbm_instance, integration_check): def test_indexes(dbm_instance, integration_check): check = integration_check(dbm_instance) - collector = SQLServerSchemaCollector(check) + collector = create_schema_collector(check) - with collector._get_cursor('datadog_test_schemas') as cursor: + with collector._get_cursor(SCHEMA_DATABASE) as cursor: assert cursor is not None # Assert that at least one row has indexes rows = cursor.fetchall_dict() @@ -139,3 +148,38 @@ def test_collect_schemas_pre_2017(dbm_instance, integration_check): collector = SQLServerSchemaCollector(check) collector.collect_schemas() + + +def test_collect_schemas_pre_2017_collects_table_details(dbm_instance, integration_check) -> None: + check = integration_check(dbm_instance) + check.static_info_cache[STATIC_INFO_MAJOR_VERSION] = 13 + collector = SQLServerSchemaCollector(check) + + with mock.patch.object(check, "database_monitoring_metadata") as database_monitoring_metadata: + collector.collect_schemas() + + assert collector._is_2016_or_earlier is True + + tables = {} + for call in database_monitoring_metadata.call_args_list: + event = json.loads(call.args[0]) + for database in event["metadata"]: + if database["name"] != SCHEMA_DATABASE: + continue + for schema in database["schemas"]: + for table in schema["tables"]: + tables[table["name"]] = table + + assert set(tables) == {'cities', 'Restaurants', 'RestaurantReviews', 'landmarks'} + + cities = tables["cities"] + assert {column["name"] for column in cities["columns"]} == {"id", "name", "population"} + assert {index["name"] for index in cities["indexes"]} == { + "PK_Cities", + "single_column_index", + "two_columns_index", + } + assert cities["partitions"]["partition_count"] > 0 + + landmarks = tables["landmarks"] + assert any(foreign_key["foreign_key_name"] == "FK_CityId" for foreign_key in landmarks["foreign_keys"]) diff --git a/sqlserver/tests/test_unit.py b/sqlserver/tests/test_unit.py index 75cd4e6d9f28a..21e43ecd686aa 100644 --- a/sqlserver/tests/test_unit.py +++ b/sqlserver/tests/test_unit.py @@ -1,6 +1,7 @@ # (C) Datadog, Inc. 2018-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import contextlib import copy import logging import os @@ -14,6 +15,9 @@ from datadog_checks.sqlserver import SQLServer from datadog_checks.sqlserver.connection import split_sqlserver_host_port from datadog_checks.sqlserver.const import ( + ENGINE_EDITION_AZURE_MANAGED_INSTANCE, + ENGINE_EDITION_SQL_DATABASE, + ENGINE_EDITION_STANDARD, STATIC_INFO_ENGINE_EDITION, STATIC_INFO_FULL_SERVERNAME, STATIC_INFO_INSTANCENAME, @@ -23,6 +27,7 @@ STATIC_INFO_VERSION, ) from datadog_checks.sqlserver.metrics import SqlFractionMetric +from datadog_checks.sqlserver.schemas import KEY_PREFIX, KEY_PREFIX_PRE_2017, SQLServerSchemaCollector from datadog_checks.sqlserver.sqlserver import SQLConnectionError from datadog_checks.sqlserver.utils import ( Database, @@ -65,6 +70,316 @@ def test_construct_use_statement(db_name, expected): assert use_stmt == expected +def create_schema_collector(static_info_cache: dict | None = None) -> SQLServerSchemaCollector: + check = mock.Mock() + check._config.schema_config = {} + check.log = mock.Mock() + check.static_info_cache = static_info_cache or {} + return SQLServerSchemaCollector(check) + + +def test_schema_collector_records_database_compatibility_levels() -> None: + collector = create_schema_collector({}) + databases = [ + { + "name": "db_130", + "id": "1", + "collation": "SQL_Latin1_General_CP1_CI_AS", + "owner": "dbo", + "compatibility_level": "130", + }, + { + "name": "db_120", + "id": "2", + "collation": "SQL_Latin1_General_CP1_CI_AS", + "owner": "dbo", + "compatibility_level": "120", + }, + ] + + collector._record_database_compatibility_levels(databases) + + assert collector._database_compatibility_levels == {"db_130": 130, "db_120": 120} + assert databases == [ + { + "name": "db_130", + "id": "1", + "collation": "SQL_Latin1_General_CP1_CI_AS", + "owner": "dbo", + "compatibility_level": "130", + }, + { + "name": "db_120", + "id": "2", + "collation": "SQL_Latin1_General_CP1_CI_AS", + "owner": "dbo", + "compatibility_level": "120", + }, + ] + + +def test_schema_collector_emits_database_compatibility_levels() -> None: + collector = create_schema_collector({}) + collector._check.get_databases.return_value = ["db_130"] + cursor = mock.Mock() + collector._check.connection.open_managed_default_connection.return_value = contextlib.nullcontext() + collector._check.connection.get_managed_cursor.return_value = contextlib.nullcontext(cursor) + databases = [ + { + "name": "db_130", + "id": "1", + "collation": "SQL_Latin1_General_CP1_CI_AS", + "owner": "dbo", + "compatibility_level": "130", + } + ] + + with mock.patch("datadog_checks.sqlserver.schemas.execute_query", return_value=databases): + collected_databases = collector._get_databases() + + assert collector._database_compatibility_levels == {"db_130": 130} + assert collected_databases == [ + { + "name": "db_130", + "id": "1", + "collation": "SQL_Latin1_General_CP1_CI_AS", + "owner": "dbo", + "compatibility_level": "130", + } + ] + assert databases[0]["compatibility_level"] == "130" + + +@pytest.mark.parametrize( + 'engine_edition, major_version, compatibility_level, expected_legacy', + [ + (ENGINE_EDITION_SQL_DATABASE, 12, 130, False), + (ENGINE_EDITION_SQL_DATABASE, 12, 120, True), + (ENGINE_EDITION_AZURE_MANAGED_INSTANCE, 12, 130, False), + (ENGINE_EDITION_AZURE_MANAGED_INSTANCE, 12, 120, True), + (ENGINE_EDITION_STANDARD, 13, 170, True), + (ENGINE_EDITION_STANDARD, 14, 130, False), + (ENGINE_EDITION_STANDARD, 14, 100, True), + (ENGINE_EDITION_STANDARD, 0, 170, True), + ], +) +def test_schema_collector_legacy_query_detection( + engine_edition: int, major_version: int, compatibility_level: int, expected_legacy: bool +) -> None: + collector = create_schema_collector( + { + STATIC_INFO_ENGINE_EDITION: engine_edition, + STATIC_INFO_MAJOR_VERSION: major_version, + } + ) + collector._database_compatibility_levels = {"datadog_test": compatibility_level} + + assert collector._should_use_legacy_schema_query("datadog_test") is expected_legacy + + +@pytest.mark.parametrize( + 'engine_edition, major_version', + [ + (ENGINE_EDITION_SQL_DATABASE, 12), + (ENGINE_EDITION_AZURE_MANAGED_INSTANCE, 12), + (ENGINE_EDITION_STANDARD, 14), + ], +) +def test_schema_collector_uses_legacy_query_when_compatibility_level_is_not_recorded( + engine_edition: int, major_version: int +) -> None: + collector = create_schema_collector( + { + STATIC_INFO_ENGINE_EDITION: engine_edition, + STATIC_INFO_MAJOR_VERSION: major_version, + } + ) + + assert collector._should_use_legacy_schema_query("datadog_test") is True + + +@pytest.mark.parametrize( + 'engine_edition, major_version, compatibility_level, expected_legacy', + [ + (ENGINE_EDITION_SQL_DATABASE, 12, 130, False), + (ENGINE_EDITION_SQL_DATABASE, 12, 120, True), + (ENGINE_EDITION_AZURE_MANAGED_INSTANCE, 12, 130, False), + (ENGINE_EDITION_AZURE_MANAGED_INSTANCE, 12, 120, True), + (ENGINE_EDITION_STANDARD, 14, 130, False), + (ENGINE_EDITION_STANDARD, 14, 120, True), + ], +) +def test_schema_collector_uses_database_compatibility_level_for_schema_query( + engine_edition: int, major_version: int, compatibility_level: int, expected_legacy: bool +) -> None: + collector = create_schema_collector( + { + STATIC_INFO_ENGINE_EDITION: engine_edition, + STATIC_INFO_MAJOR_VERSION: major_version, + } + ) + collector._database_compatibility_levels = {"datadog_test": compatibility_level} + main_cursor = mock.Mock() + detail_cursor = mock.Mock() + collector._check.connection.open_managed_default_connection.side_effect = [ + contextlib.nullcontext(), + contextlib.nullcontext(), + ] + collector._check.connection.get_managed_cursor.side_effect = [ + contextlib.nullcontext(main_cursor), + contextlib.nullcontext(detail_cursor), + ] + + with collector._get_cursor("datadog_test"): + pass + + tables_query = main_cursor.execute.call_args_list[1][0][0] + assert collector._is_2016_or_earlier is expected_legacy + assert ("STRING_AGG" in tables_query) is not expected_legacy + assert (detail_cursor.execute.call_count == 1) is expected_legacy + + +def test_schema_collector_reuses_pre_2017_connection_for_table_detail_queries() -> None: + collector = create_schema_collector( + { + STATIC_INFO_ENGINE_EDITION: ENGINE_EDITION_STANDARD, + STATIC_INFO_MAJOR_VERSION: 13, + } + ) + main_cursor = mock.Mock() + detail_cursor = mock.Mock() + detail_cursor.fetchall_dict.side_effect = [ + [{"name": "id"}], + [{"name": "pk_table_1"}], + [{"foreign_key_name": "fk_table_1"}], + [{"name": "id"}], + [{"name": "pk_table_2"}], + [{"foreign_key_name": "fk_table_2"}], + ] + detail_cursor.fetchone_dict.side_effect = [{"partition_count": 1}, {"partition_count": 2}] + collector._check.connection.open_managed_default_connection.side_effect = [ + contextlib.nullcontext(), + contextlib.nullcontext(), + ] + collector._check.connection.get_managed_cursor.side_effect = [ + contextlib.nullcontext(main_cursor), + contextlib.nullcontext(detail_cursor), + ] + database = {"name": "datadog_test", "id": "1", "collation": "SQL_Latin1_General_CP1_CI_AS", "owner": "dbo"} + rows = [ + {"schema_name": "test_schema", "schema_id": 1, "owner_name": "dbo", "table_name": "table_1", "table_id": 101}, + {"schema_name": "test_schema", "schema_id": 1, "owner_name": "dbo", "table_name": "table_2", "table_id": 102}, + ] + + with collector._get_cursor("datadog_test"): + mapped_rows = [collector._map_row(database, row) for row in rows] + + assert collector._check.connection.open_managed_default_connection.call_args_list == [ + mock.call(KEY_PREFIX), + mock.call(KEY_PREFIX_PRE_2017), + ] + assert collector._check.connection.get_managed_cursor.call_args_list == [ + mock.call(KEY_PREFIX), + mock.call(KEY_PREFIX_PRE_2017), + ] + assert detail_cursor.execute.call_args_list[0] == mock.call("USE [datadog_test];") + assert detail_cursor.execute.call_args_list.count(mock.call("USE [datadog_test];")) == 1 + assert detail_cursor.execute.call_count == 9 + assert collector._pre_2017_cursor is None + assert [row["schemas"][0]["tables"][0]["partitions"]["partition_count"] for row in mapped_rows] == [1, 2] + + +def configure_pre_2017_detail_cursor(cursor: mock.Mock, partition_count: int = 1) -> None: + cursor.fetchall_dict.side_effect = [ + [{"name": "id"}], + [{"name": "pk_table"}], + [{"foreign_key_name": "fk_table"}], + ] + cursor.fetchone_dict.return_value = {"partition_count": partition_count} + + +def test_schema_collector_uses_new_pre_2017_connection_for_each_database() -> None: + collector = create_schema_collector( + { + STATIC_INFO_ENGINE_EDITION: ENGINE_EDITION_STANDARD, + STATIC_INFO_MAJOR_VERSION: 13, + } + ) + main_cursor_1 = mock.Mock() + detail_cursor_1 = mock.Mock() + configure_pre_2017_detail_cursor(detail_cursor_1, partition_count=1) + main_cursor_2 = mock.Mock() + detail_cursor_2 = mock.Mock() + configure_pre_2017_detail_cursor(detail_cursor_2, partition_count=2) + collector._check.connection.open_managed_default_connection.side_effect = [ + contextlib.nullcontext(), + contextlib.nullcontext(), + contextlib.nullcontext(), + contextlib.nullcontext(), + ] + collector._check.connection.get_managed_cursor.side_effect = [ + contextlib.nullcontext(main_cursor_1), + contextlib.nullcontext(detail_cursor_1), + contextlib.nullcontext(main_cursor_2), + contextlib.nullcontext(detail_cursor_2), + ] + row = {"schema_name": "test_schema", "schema_id": 1, "owner_name": "dbo", "table_name": "table", "table_id": 101} + + with collector._get_cursor("datadog_test_1"): + collector._map_row( + {"name": "datadog_test_1", "id": "1", "collation": "SQL_Latin1_General_CP1_CI_AS", "owner": "dbo"}, + row, + ) + with collector._get_cursor("datadog_test_2"): + collector._map_row( + {"name": "datadog_test_2", "id": "2", "collation": "SQL_Latin1_General_CP1_CI_AS", "owner": "dbo"}, + row, + ) + + assert collector._check.connection.get_managed_cursor.call_args_list == [ + mock.call(KEY_PREFIX), + mock.call(KEY_PREFIX_PRE_2017), + mock.call(KEY_PREFIX), + mock.call(KEY_PREFIX_PRE_2017), + ] + assert detail_cursor_1.execute.call_args_list[0] == mock.call("USE [datadog_test_1];") + assert detail_cursor_2.execute.call_args_list[0] == mock.call("USE [datadog_test_2];") + assert collector._pre_2017_cursor is None + + +def test_schema_collector_does_not_open_pre_2017_connection_for_modern_query() -> None: + collector = create_schema_collector( + { + STATIC_INFO_ENGINE_EDITION: ENGINE_EDITION_STANDARD, + STATIC_INFO_MAJOR_VERSION: 14, + } + ) + collector._database_compatibility_levels = {"datadog_test": 130} + cursor = mock.Mock() + collector._check.connection.open_managed_default_connection.return_value = contextlib.nullcontext() + collector._check.connection.get_managed_cursor.return_value = contextlib.nullcontext(cursor) + database = {"name": "datadog_test", "id": "1", "collation": "SQL_Latin1_General_CP1_CI_AS", "owner": "dbo"} + row = { + "schema_name": "test_schema", + "schema_id": 1, + "owner_name": "dbo", + "table_name": "table", + "table_id": 101, + "columns": '[{"name": "id"}]', + "indexes": '[{"name": "pk_table"}]', + "foreign_keys": '[{"foreign_key_name": "fk_table"}]', + "partition_count": 1, + } + + with collector._get_cursor("datadog_test"): + mapped_row = collector._map_row(database, row) + + assert collector._check.connection.open_managed_default_connection.call_args_list == [mock.call(KEY_PREFIX)] + assert collector._check.connection.get_managed_cursor.call_args_list == [mock.call(KEY_PREFIX)] + assert collector._pre_2017_cursor is None + assert mapped_row["schemas"][0]["tables"][0]["columns"] == [{"name": "id"}] + + def test_get_cursor(instance_docker): """ Ensure we don't leak connection info in case of a KeyError when the