Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sqlserver/changelog.d/23533.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix Azure SQL Database and Azure SQL Managed Instance schema collection by using database compatibility level for schema query selection.
1 change: 1 addition & 0 deletions sqlserver/changelog.d/23544.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reuse the auxiliary SQL Server schema collection connection for legacy table detail queries.
3 changes: 2 additions & 1 deletion sqlserver/datadog_checks/sqlserver/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
# `{}` is replaced with comma-separated ODBC `?` placeholders (values bound via parameters).
DB_QUERY = """
SELECT
db.database_id AS id, db.name AS name, db.collation_name AS collation, dp.name AS owner
db.database_id AS id, db.name AS name, db.collation_name AS collation, dp.name AS owner,
db.compatibility_level AS compatibility_level
FROM
sys.databases db LEFT JOIN sys.database_principals dp ON db.owner_sid = dp.sid
WHERE db.name IN ({});
Expand Down
115 changes: 82 additions & 33 deletions sqlserver/datadog_checks/sqlserver/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
from typing import TYPE_CHECKING, TypedDict

from datadog_checks.base.utils.serialization import json
from datadog_checks.sqlserver.utils import construct_use_statement, execute_query
from datadog_checks.sqlserver.utils import construct_use_statement, execute_query, is_azure_database

if TYPE_CHECKING:
from datadog_checks.sqlserver import SQLServer

from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig
from datadog_checks.sqlserver.const import (
DEFAULT_SCHEMAS_COLLECTION_INTERVAL,
STATIC_INFO_ENGINE_EDITION,
STATIC_INFO_MAJOR_VERSION,
)
from datadog_checks.sqlserver.queries import (
Expand All @@ -32,13 +33,16 @@

KEY_PREFIX = "dbm-schemas-"
KEY_PREFIX_PRE_2017 = "dbm-schemas-pre-2017"
# The modern schema query uses database-scoped JSON output, which requires SQL Server 2016 compatibility.
MINIMUM_JSON_COMPATIBILITY_LEVEL = 130


class DatabaseInfo(TypedDict):
name: str
id: str
collation: str
owner: str
compatibility_level: str


# The schema collector sends lists of DatabaseObjects to the agent
Expand Down Expand Up @@ -81,40 +85,85 @@ def __init__(self, check: SQLServer):
)
config.max_tables = check._config.schema_config.get('max_tables', 300)
self._is_2016_or_earlier = None
self._database_compatibility_levels: dict[str, int] = {}
self._pre_2017_cursor = None
super().__init__(check, config)

def collect_schemas(self):
# We wait until collect is called to check for static information
major_version = int(self._check.static_info_cache.get(STATIC_INFO_MAJOR_VERSION) or 0)
if major_version == 0:
self._check.log.debug("major_version is not available yet, defaulting to 2016 or earlier")
self._is_2016_or_earlier = major_version <= 13

super().collect_schemas()

@property
def kind(self):
return "sqlserver_databases"

def _get_databases(self):
def _get_databases(self) -> list[DatabaseInfo]:
database_names = self._check.get_databases()
with self._check.connection.open_managed_default_connection(KEY_PREFIX):
with self._check.connection.get_managed_cursor(KEY_PREFIX) as cursor:
if not database_names:
return []
placeholders = ",".join(["?"] * len(database_names))
query = DB_QUERY.format(placeholders)
return execute_query(query, cursor, convert_results_to_str=True, parameters=tuple(database_names))
databases = execute_query(query, cursor, convert_results_to_str=True, parameters=tuple(database_names))
self._record_database_compatibility_levels(databases)
return databases

@contextlib.contextmanager
def _get_cursor(self, database_name):
with self._check.connection.open_managed_default_connection(KEY_PREFIX):
with self._check.connection.get_managed_cursor(KEY_PREFIX) as cursor:
with contextlib.ExitStack() as stack:
try:
stack.enter_context(self._check.connection.open_managed_default_connection(KEY_PREFIX))
cursor = stack.enter_context(self._check.connection.get_managed_cursor(KEY_PREFIX))
switch_db_statement = construct_use_statement(database_name)
cursor.execute(switch_db_statement)
self._is_2016_or_earlier = self._should_use_legacy_schema_query(database_name)

if self._is_2016_or_earlier:
stack.enter_context(self._check.connection.open_managed_default_connection(KEY_PREFIX_PRE_2017))
self._pre_2017_cursor = stack.enter_context(
self._check.connection.get_managed_cursor(KEY_PREFIX_PRE_2017)
)
self._pre_2017_cursor.execute(switch_db_statement)

query = self._get_tables_query()
cursor.execute(query)
yield cursor
finally:
self._pre_2017_cursor = None

def _record_database_compatibility_levels(self, databases: list[DatabaseInfo]) -> None:
self._database_compatibility_levels = {}
for database in databases:
self._database_compatibility_levels[database["name"]] = int(database["compatibility_level"])

def _should_use_legacy_schema_query(self, database_name: str) -> bool:
"""
Return whether the current database needs the legacy schema query.

The modern schema query depends on two SQL Server features:
- STRING_AGG, used to build index column lists. On self-managed SQL Server, this requires SQL Server 2017
or later. Azure SQL Database and Azure SQL Managed Instance report ProductMajorVersion 12 while still
supporting STRING_AGG, so only self-managed SQL Server uses this version gate.
- JSON output, used for column, index, and foreign key metadata. This is controlled by each database's
compatibility_level and requires level 130 or higher.

If ProductMajorVersion is missing, use the legacy query because we cannot confirm that STRING_AGG is
available.
"""
engine_edition = self._check.static_info_cache.get(STATIC_INFO_ENGINE_EDITION)
if not is_azure_database(engine_edition):
major_version = int(self._check.static_info_cache.get(STATIC_INFO_MAJOR_VERSION) or 0)
if major_version == 0:
self._check.log.debug("major_version is not available yet, using legacy schema query")
return True
if major_version <= 13:
return True

compatibility_level = self._database_compatibility_levels.get(database_name)
if compatibility_level is None:
self._check.log.debug(
"compatibility_level is not available for SQL Server database %s, using pre-2017 schema query",
database_name,
)
return True
return compatibility_level < MINIMUM_JSON_COMPATIBILITY_LEVEL

def _get_tables_query(self):
limit = int(self._config.max_tables or 1_000_000)
Expand Down Expand Up @@ -169,25 +218,25 @@ def _map_row(self, database: DatabaseInfo, cursor_row) -> DatabaseObject:
object = super()._map_row(database, cursor_row)
if self._is_2016_or_earlier:
# We need to fetch the related data for each table
# Use a key_prefix to get a separate connection to avoid conflicts with the main connection
with self._check.connection.open_managed_default_connection(KEY_PREFIX_PRE_2017):
with self._check.connection.get_managed_cursor(KEY_PREFIX_PRE_2017) as cursor:
switch_db_statement = construct_use_statement(database.get("name"))
cursor.execute(switch_db_statement)
table_id = str(cursor_row.get("table_id"))
columns_query = COLUMN_QUERY.replace("schema_tables.table_id", table_id)
cursor.execute(columns_query)
columns = cursor.fetchall_dict()
indexes_query = INDEX_QUERY_PRE_2017.replace("schema_tables.table_id", table_id)
cursor.execute(indexes_query)
indexes = cursor.fetchall_dict()
foreign_keys_query = FOREIGN_KEY_QUERY_PRE_2017.replace("schema_tables.table_id", table_id)
cursor.execute(foreign_keys_query)
foreign_keys = cursor.fetchall_dict()
partitions_query = PARTITIONS_QUERY.replace("schema_tables.table_id", table_id)
cursor.execute(partitions_query)
partition_row = cursor.fetchone_dict()
partition_count = partition_row.get("partition_count") if partition_row else None
# Use a separate connection to avoid conflicts with the main cursor while it streams table rows.
cursor = self._pre_2017_cursor
if cursor is None:
raise RuntimeError("pre-2017 schema cursor is not initialized")

table_id = str(cursor_row.get("table_id"))
columns_query = COLUMN_QUERY.replace("schema_tables.table_id", table_id)
cursor.execute(columns_query)
columns = cursor.fetchall_dict()
indexes_query = INDEX_QUERY_PRE_2017.replace("schema_tables.table_id", table_id)
cursor.execute(indexes_query)
indexes = cursor.fetchall_dict()
foreign_keys_query = FOREIGN_KEY_QUERY_PRE_2017.replace("schema_tables.table_id", table_id)
cursor.execute(foreign_keys_query)
foreign_keys = cursor.fetchall_dict()
partitions_query = PARTITIONS_QUERY.replace("schema_tables.table_id", table_id)
cursor.execute(partitions_query)
partition_row = cursor.fetchone_dict()
partition_count = partition_row.get("partition_count") if partition_row else None
else:
columns = json.loads(cursor_row.get("columns") or "[]")
indexes = json.loads(cursor_row.get("indexes") or "[]")
Expand Down
9 changes: 9 additions & 0 deletions sqlserver/tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
pyodbc = None


def normalize_compatibility_level(actual_payload):
assert actual_payload['compatibility_level'].isdigit()
actual_payload['compatibility_level'] = 'normalized_value'


@pytest.fixture
def dbm_instance(instance_docker):
instance_docker['dbm'] = True
Expand Down Expand Up @@ -101,6 +106,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance):
'name': 'datadog_test_schemas_second',
"collation": "SQL_Latin1_General_CP1_CI_AS",
'owner': 'dbo',
'compatibility_level': 'normalized_value',
'schemas': [
{
'name': 'dbo',
Expand Down Expand Up @@ -147,6 +153,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance):
'name': 'datadog_test_schemas',
"collation": "SQL_Latin1_General_CP1_CI_AS",
'owner': 'dbo',
'compatibility_level': 'normalized_value',
'schemas': [
{
'name': 'test_schema',
Expand Down Expand Up @@ -373,6 +380,8 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance):
assert db_name in databases_to_find
# id's are env dependant
normalize_ids(actual_payload)
# compatibility_level varies by SQL Server version
normalize_compatibility_level(actual_payload)
# index columns may be in any order
normalize_indexes_columns(actual_payload)
matches = deep_compare(actual_payload, expected_data_for_db[db_name])
Expand Down
62 changes: 53 additions & 9 deletions sqlserver/tests/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import json
from typing import Callable, Optional
from unittest import mock

import pytest

Expand All @@ -15,9 +16,12 @@

pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')]

SCHEMA_DATABASE = 'datadog_test_schemas'


@pytest.fixture
def dbm_instance(instance_docker):
instance_docker['database'] = SCHEMA_DATABASE
instance_docker['dbm'] = True
instance_docker['min_collection_interval'] = 0.1
instance_docker['query_samples'] = {'enabled': False}
Expand Down Expand Up @@ -47,11 +51,17 @@ def _check(instance: dict, init_config: dict = None):
c.cancel()


def create_schema_collector(check: SQLServer) -> SQLServerSchemaCollector:
collector = SQLServerSchemaCollector(check)
collector._get_databases()
return collector


def test_get_cursor(dbm_instance, integration_check):
check = integration_check(dbm_instance)
collector = SQLServerSchemaCollector(check)
collector = create_schema_collector(check)

with collector._get_cursor('datadog_test_schemas') as cursor:
with collector._get_cursor(SCHEMA_DATABASE) as cursor:
assert cursor is not None
schemas = []
rows = cursor.fetchall_dict()
Expand All @@ -65,9 +75,9 @@ def test_get_cursor(dbm_instance, integration_check):

def test_tables(dbm_instance, integration_check):
check = integration_check(dbm_instance)
collector = SQLServerSchemaCollector(check)
collector = create_schema_collector(check)

with collector._get_cursor('datadog_test_schemas') as cursor:
with collector._get_cursor(SCHEMA_DATABASE) as cursor:
assert cursor is not None
tables = []
rows = cursor.fetchall_dict()
Expand All @@ -80,13 +90,12 @@ def test_tables(dbm_instance, integration_check):

def test_columns(dbm_instance, integration_check):
check = integration_check(dbm_instance)
collector = SQLServerSchemaCollector(check)
collector = create_schema_collector(check)

with collector._get_cursor('datadog_test_schemas') as cursor:
with collector._get_cursor(SCHEMA_DATABASE) as cursor:
assert cursor is not None
# Assert that at least one row has columns
rows = cursor.fetchall_dict()
print(rows)
assert any(row['columns'] for row in rows)
for row in rows:
if row['columns']:
Expand All @@ -101,9 +110,9 @@ def test_columns(dbm_instance, integration_check):

def test_indexes(dbm_instance, integration_check):
check = integration_check(dbm_instance)
collector = SQLServerSchemaCollector(check)
collector = create_schema_collector(check)

with collector._get_cursor('datadog_test_schemas') as cursor:
with collector._get_cursor(SCHEMA_DATABASE) as cursor:
assert cursor is not None
# Assert that at least one row has indexes
rows = cursor.fetchall_dict()
Expand Down Expand Up @@ -139,3 +148,38 @@ def test_collect_schemas_pre_2017(dbm_instance, integration_check):
collector = SQLServerSchemaCollector(check)

collector.collect_schemas()


def test_collect_schemas_pre_2017_collects_table_details(dbm_instance, integration_check) -> None:
check = integration_check(dbm_instance)
check.static_info_cache[STATIC_INFO_MAJOR_VERSION] = 13
collector = SQLServerSchemaCollector(check)

with mock.patch.object(check, "database_monitoring_metadata") as database_monitoring_metadata:
collector.collect_schemas()

assert collector._is_2016_or_earlier is True

tables = {}
for call in database_monitoring_metadata.call_args_list:
event = json.loads(call.args[0])
for database in event["metadata"]:
if database["name"] != SCHEMA_DATABASE:
continue
for schema in database["schemas"]:
for table in schema["tables"]:
tables[table["name"]] = table

assert set(tables) == {'cities', 'Restaurants', 'RestaurantReviews', 'landmarks'}

cities = tables["cities"]
assert {column["name"] for column in cities["columns"]} == {"id", "name", "population"}
assert {index["name"] for index in cities["indexes"]} == {
"PK_Cities",
"single_column_index",
"two_columns_index",
}
assert cities["partitions"]["partition_count"] > 0

landmarks = tables["landmarks"]
assert any(foreign_key["foreign_key_name"] == "FK_CityId" for foreign_key in landmarks["foreign_keys"])
Loading
Loading