Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion postgres/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,6 @@ files:
- monitor_id
- dbname
- query
- interval_seconds
- entity
properties:
- name: monitor_id
Expand All @@ -641,7 +640,17 @@ files:
- name: query
type: string
- name: interval_seconds
description: |
How often (in seconds) to run this query. At least one of interval_seconds or
schedule must be set. When both are present, schedule takes precedence and
interval_seconds is ignored.
type: integer
- name: schedule
description: |
A standard 5-field cron expression (minute hour dom month dow) specifying
when to run this query. When set, takes precedence over interval_seconds.
At least one of schedule or interval_seconds must be set.
type: string
- name: entity
type: object
required:
Expand Down
4 changes: 0 additions & 4 deletions postgres/datadog_checks/postgres/config_models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# (C) Datadog, Inc. 2021-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

# This file is autogenerated.
# To change this file you should edit assets/configuration/spec.yaml and then run the following commands:
# ddev -x validate config -s <INTEGRATION_NAME>
Expand Down
4 changes: 0 additions & 4 deletions postgres/datadog_checks/postgres/config_models/defaults.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# (C) Datadog, Inc. 2021-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

# This file is autogenerated.
# To change this file you should edit assets/configuration/spec.yaml and then run the following commands:
# ddev -x validate config -s <INTEGRATION_NAME>
Expand Down
13 changes: 8 additions & 5 deletions postgres/datadog_checks/postgres/config_models/instance.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# (C) Datadog, Inc. 2021-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

# This file is autogenerated.
# To change this file you should edit assets/configuration/spec.yaml and then run the following commands:
# ddev -x validate config -s <INTEGRATION_NAME>
Expand Down Expand Up @@ -140,9 +136,16 @@ class Query(BaseModel):
custom_sql_select_fields: Optional[CustomSqlSelectFields] = None
dbname: str
entity: Entity
interval_seconds: int
interval_seconds: Optional[int] = Field(
None,
description='How often (in seconds) to run this query. At least one of interval_seconds or\nschedule must be set. When both are present, schedule takes precedence and\ninterval_seconds is ignored.\n',
)
monitor_id: int
query: str
schedule: Optional[str] = Field(
None,
description='A standard 5-field cron expression (minute hour dom month dow) specifying\nwhen to run this query. When set, takes precedence over interval_seconds.\nAt least one of schedule or interval_seconds must be set.\n',
)
type: Optional[str] = None


Expand Down
4 changes: 0 additions & 4 deletions postgres/datadog_checks/postgres/config_models/shared.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# (C) Datadog, Inc. 2021-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

# This file is autogenerated.
# To change this file you should edit assets/configuration/spec.yaml and then run the following commands:
# ddev -x validate config -s <INTEGRATION_NAME>
Expand Down
66 changes: 58 additions & 8 deletions postgres/datadog_checks/postgres/data_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

import json
import time
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal

import psycopg
from croniter import croniter

from datadog_checks.base.utils.db.utils import DBMAsyncJob, default_json_event_encoding

Expand All @@ -26,6 +27,8 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
self._check = check
self._config = config
self._last_execution: dict[int, float] = {}
self._next_run: dict[int, float] = {}
self._invalid_warned: set[int] = set()
collection_interval = config.data_observability.collection_interval or 10
super(PostgresDataObservability, self).__init__(
check,
Expand All @@ -42,14 +45,47 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
def _do_config(self):
return self._config.data_observability

def _get_due_queries(self) -> list[Query]:
def _warn_invalid_once(self, monitor_id: int, reason: str) -> None:
if monitor_id not in self._invalid_warned:
self._invalid_warned.add(monitor_id)
self._log.warning("Skipping invalid DO query monitor_id=%d: %s", monitor_id, reason)

def _query_mode(self, q: Query) -> Literal["cron", "interval"] | None:
if q.schedule:
if not croniter.is_valid(q.schedule):
self._warn_invalid_once(q.monitor_id, f"invalid cron schedule: {q.schedule!r}")
return None
return "cron"
if q.interval_seconds and q.interval_seconds > 0:
return "interval"
self._warn_invalid_once(q.monitor_id, "neither schedule nor positive interval_seconds set")
return None

def _compute_next_run(self, schedule: str, base: float) -> float:
return croniter(schedule, base).get_next(float)

def _get_due_queries(self) -> list[tuple[Query, float]]:
queries = self._do_config.queries or ()
now = time.time()
due = []
due: list[tuple[Query, float]] = []
for q in queries:
last_run = self._last_execution.get(q.monitor_id, 0.0)
if now - last_run >= q.interval_seconds:
due.append(q)
mode = self._query_mode(q)
if mode == "cron":
next_run = self._next_run.get(q.monitor_id)
if next_run is None:
# First sight of this query: schedule the first future tick;
# do NOT fire immediately on load.
self._next_run[q.monitor_id] = self._compute_next_run(q.schedule, now)
continue
if now >= next_run:
due.append((q, next_run))
elif mode == "interval":
last_run = self._last_execution.get(q.monitor_id, 0.0)
if now - last_run >= q.interval_seconds:
# First fire (no prior last_run): treat scheduled time as `now` → lateness = 0.
scheduled = (last_run + q.interval_seconds) if last_run > 0 else now
due.append((q, scheduled))
# mode is None → invalid query, already warned, skip
return due

def _build_base_tags(self) -> list[str]:
Expand Down Expand Up @@ -144,16 +180,20 @@ def run_job(self):

base_tags = self._build_base_tags()

for q in due_queries:
for q, scheduled_fire_time in due_queries:
tags = base_tags + [f'monitor_id:{q.monitor_id}']

now_at_fire_start = time.time()
with self._check.db_pool.get_connection(q.dbname) as conn:
result = self._execute_single_query(conn, q)

# Update scheduling timestamp immediately after execution, before
# metric/event emission, so a serialization failure in the event
# path cannot cause infinite re-execution of the same query.
self._last_execution[q.monitor_id] = time.time()
now_at_fire_end = time.time()
self._last_execution[q.monitor_id] = now_at_fire_end
if q.schedule:
self._next_run[q.monitor_id] = self._compute_next_run(q.schedule, now_at_fire_end)

try:
self._check.gauge(
Expand All @@ -171,6 +211,16 @@ def run_job(self):
raw=True,
)

mode_tag = 'mode:cron' if q.schedule else 'mode:interval'
lateness = max(0.0, now_at_fire_start - scheduled_fire_time)
self._check.gauge(
'dd.postgres.data_observability.query_fire_lateness_seconds',
lateness,
tags=tags + [mode_tag],
hostname=self._check.reported_hostname,
raw=True,
)

payload = self._build_event_payload(q, result)
raw_event = json.dumps(payload, default=default_json_event_encoding)
self._log.debug(
Expand Down
1 change: 1 addition & 0 deletions postgres/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ deps = [
"azure-identity==1.25.3",
"boto3==1.42.72",
"cachetools==7.0.5",
"croniter==6.2.2",
"psycopg[c,pool]==3.3.3",
"semver==3.0.4",
]
Expand Down
96 changes: 96 additions & 0 deletions postgres/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,99 @@ def test_autodiscovery_exclude_none_does_not_error(mock_check):
config, result = build_config(check=mock_check)
assert result.valid
assert config.dbname == 'main'


# ---------------------------------------------------------------------------
# Data Observability — schedule field config tests
# ---------------------------------------------------------------------------


def test_do_query_schedule_field_defaults_to_none(mock_check, minimal_instance):
"""A DO query without schedule field has schedule=None by default."""
minimal_instance['data_observability'] = {
'enabled': True,
'run_sync': True,
'queries': [
{
'monitor_id': 1,
'dbname': 'mydb',
'query': 'SELECT 1',
'interval_seconds': 60,
'entity': {
'platform': 'aws',
'account': '123',
'database': 'mydb',
'schema': 'public',
'table': 'foo',
},
}
],
}
mock_check.instance = minimal_instance
mock_check.init_config = {}
config, result = build_config(check=mock_check)
assert result.valid
query = config.data_observability.queries[0]
assert query.schedule is None
assert query.interval_seconds == 60


def test_do_query_schedule_field_parsed(mock_check, minimal_instance):
"""A DO query with schedule field is parsed correctly; interval_seconds may be absent."""
minimal_instance['data_observability'] = {
'enabled': True,
'run_sync': True,
'queries': [
{
'monitor_id': 2,
'dbname': 'mydb',
'query': 'SELECT 1',
'schedule': '20 * * * *',
'entity': {
'platform': 'aws',
'account': '123',
'database': 'mydb',
'schema': 'public',
'table': 'bar',
},
}
],
}
mock_check.instance = minimal_instance
mock_check.init_config = {}
config, result = build_config(check=mock_check)
assert result.valid
query = config.data_observability.queries[0]
assert query.schedule == '20 * * * *'
assert query.interval_seconds is None


def test_do_query_both_schedule_and_interval_parsed(mock_check, minimal_instance):
"""A DO query with both schedule and interval_seconds is accepted; both fields present."""
minimal_instance['data_observability'] = {
'enabled': True,
'run_sync': True,
'queries': [
{
'monitor_id': 3,
'dbname': 'mydb',
'query': 'SELECT 1',
'schedule': '0 * * * *',
'interval_seconds': 3600,
'entity': {
'platform': 'aws',
'account': '123',
'database': 'mydb',
'schema': 'public',
'table': 'baz',
},
}
],
}
mock_check.instance = minimal_instance
mock_check.init_config = {}
config, result = build_config(check=mock_check)
assert result.valid
query = config.data_observability.queries[0]
assert query.schedule == '0 * * * *'
assert query.interval_seconds == 3600
Loading
Loading