Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ingestion/src/metadata/data_quality/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from metadata.config.common import ConfigModel
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.table import Table, TableData
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
Expand Down Expand Up @@ -51,6 +51,9 @@ class TestSuiteProcessorConfig(ConfigModel):
class TestCaseResultResponse(BaseModel):
testCaseResult: TestCaseResult
testCase: TestCase
failedRowsSample: Optional[TableData] = None
inspectionQuery: Optional[str] = None
validateColumns: bool = True


class TableAndTests(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from abc import ABC, abstractmethod
from typing import Optional, Set, Type

from metadata.data_quality.api.models import TestCaseResultResponse
from metadata.data_quality.builders.validator_builder import ValidatorBuilder
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.data_quality.validations.runtime_param_setter.param_setter import (
Expand All @@ -27,7 +28,7 @@
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.basic import TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.ingestion.ometa.ometa_api import OpenMetadata
Expand Down Expand Up @@ -108,7 +109,7 @@ def _set_runtime_params_setter_fact(
"""
cls.runtime_params_setter_fact = class_fact

def run_test_case(self, test_case: TestCase) -> Optional[TestCaseResult]:
def run_test_case(self, test_case: TestCase) -> Optional[TestCaseResultResponse]:
"""run column data quality tests"""
runtime_params_setter_fact: RuntimeParameterSetterFactory = (
self._get_runtime_params_setter_fact()
Expand Down Expand Up @@ -138,11 +139,14 @@ def run_test_case(self, test_case: TestCase) -> Optional[TestCaseResult]:
f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}"
)
logger.exception(message)
return validator.get_test_case_result_object(
validator.execution_date,
TestCaseStatus.Aborted,
message,
[],
return TestCaseResultResponse(
testCase=test_case,
testCaseResult=validator.get_test_case_result_object(
validator.execution_date,
TestCaseStatus.Aborted,
message,
[],
),
)

def _get_table_config(self):
Expand Down
9 changes: 2 additions & 7 deletions ingestion/src/metadata/data_quality/runner/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""


from metadata.data_quality.api.models import TestCaseResultResponse
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.generated.schema.tests.testCase import TestCase
from metadata.utils.logger import test_suite_logger
Expand All @@ -34,12 +33,8 @@ def run_and_handle(self, test_case: TestCase):
f"Executing test case {test_case.name.root} "
f"for entity {self.test_runner_interface.table_entity.fullyQualifiedName.root}"
)
test_result = self.test_runner_interface.run_test_case(
result = self.test_runner_interface.run_test_case(
test_case,
)

if test_result:
return TestCaseResultResponse(
testCaseResult=test_result, testCase=test_case
)
return None
return result
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from pydantic import BaseModel

from metadata.data_quality.api.models import TestCaseResultResponse
from metadata.data_quality.validations import utils
from metadata.data_quality.validations.impact_score import (
DEFAULT_TOP_DIMENSIONS,
Expand Down Expand Up @@ -134,7 +135,7 @@ def _get_top_dimensions(self) -> int:
return DEFAULT_TOP_DIMENSIONS
return min(value, MAX_TOP_DIMENSIONS)

def run_validation(self) -> TestCaseResult:
def run_validation(self) -> TestCaseResultResponse:
"""Template method defining the validation flow with optional dimensional analysis
This method orchestrates the overall validation process:
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_validation() is now typed to return TestCaseResultResponse, but there is still an early return in this method that returns test_result (a TestCaseResult) when dimension columns are invalid. That creates an inconsistent return type and will break callers expecting the response wrapper; wrap the early-return path in TestCaseResultResponse as well.

Copilot uses AI. Check for mistakes.
Comment on lines 135 to 141
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing BaseTestValidator.run_validation() to return TestCaseResultResponse is a breaking API change for other callers that still expect a TestCaseResult (e.g., ingestion/src/metadata/sdk/data_quality/dataframes/dataframe_validation_engine.py calls validator.run_validation() and treats the return as TestCaseResult). Please update those callers (or provide a backward-compatible API) to avoid runtime type errors.

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -186,7 +187,20 @@ def run_validation(self) -> TestCaseResult:
)
logger.debug(traceback.format_exc())

return test_result
result: TestCaseResultResponse = TestCaseResultResponse(
testCaseResult=test_result, testCase=self.test_case
)

self.result_with_failed_samples(result)

return result
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that run_validation() returns TestCaseResultResponse, please ensure all code paths return a TestCaseResultResponse. There is still an early-return path earlier in the method that returns the raw TestCaseResult (e.g., when dimension columns are invalid), which will break callers expecting testCase/failedRowsSample on the response.

Copilot uses AI. Check for mistakes.

def result_with_failed_samples(self, result: TestCaseResultResponse) -> None:
"""Hook for failed row sampling. No-op by default.
Overridden by FailedSampleValidatorMixin to fetch and stash
failed row samples on the validator instance.
"""

@abstractmethod
def _run_validation(self) -> TestCaseResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@
BaseColumnValueLengthsToBeBetweenValidator,
)
from metadata.data_quality.validations.impact_score import calculate_impact_score_pandas
from metadata.data_quality.validations.mixins.failed_row_sampler_mixin import (
PandasFailedRowSamplerMixin,
)
from metadata.data_quality.validations.mixins.failed_sample_validator_mixin import (
FailedSampleValidatorMixin,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_statistical_pandas,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.registry import Metrics
from metadata.utils.logger import test_suite_logger
Expand All @@ -41,7 +48,10 @@


class ColumnValueLengthsToBeBetweenValidator(
BaseColumnValueLengthsToBeBetweenValidator, PandasValidatorMixin
FailedSampleValidatorMixin,
BaseColumnValueLengthsToBeBetweenValidator,
PandasValidatorMixin,
PandasFailedRowSamplerMixin,
):
"""Validator for column value lengths to be between test case"""

Expand Down Expand Up @@ -238,3 +248,21 @@ def compute_row_count(self, column: SQALikeColumn, min_bound: int, max_bound: in
)

return row_count, failed_rows

def filter(self):
min_bound = self.get_min_bound("minLength")
max_bound = self.get_max_bound("maxLength")
filters = []
if min_bound is not None and min_bound > float("-inf"):
filters.append(
f"{self.get_column().name}.astype('str').str.len() < {min_bound}"
)
if max_bound is not None and max_bound < float("inf"):
filters.append(
f"{self.get_column().name}.astype('str').str.len() > {max_bound}"
)
return " or ".join(filters)

def fetch_failed_rows_sample(self):
cols, rows = self._get_failed_rows_sample()
return TableData(columns=cols, rows=rows)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

from collections import defaultdict
from datetime import datetime
from typing import List, Optional, cast

import pandas as pd
Expand All @@ -27,20 +28,32 @@
BaseColumnValuesToBeBetweenValidator,
)
from metadata.data_quality.validations.impact_score import calculate_impact_score_pandas
from metadata.data_quality.validations.mixins.failed_row_sampler_mixin import (
PandasFailedRowSamplerMixin,
)
from metadata.data_quality.validations.mixins.failed_sample_validator_mixin import (
FailedSampleValidatorMixin,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_statistical_pandas,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.orm.registry import is_date_time
from metadata.utils.logger import test_suite_logger
from metadata.utils.sqa_like_column import SQALikeColumn
from metadata.utils.time_utils import convert_timestamp

logger = test_suite_logger()


class ColumnValuesToBeBetweenValidator(
BaseColumnValuesToBeBetweenValidator, PandasValidatorMixin
FailedSampleValidatorMixin,
BaseColumnValuesToBeBetweenValidator,
PandasValidatorMixin,
PandasFailedRowSamplerMixin,
):
"""Validator for column values to be between test case"""

Expand Down Expand Up @@ -237,3 +250,34 @@ def compute_row_count(self, column: SQALikeColumn, min_bound: int, max_bound: in
)

return row_count, failed_rows

def filter(self):
column = self.get_column()
if is_date_time(column.type):
min_bound = self.get_test_case_param_value(
self.test_case.parameterValues,
"minValue",
type_=datetime.fromtimestamp,
default=datetime.min,
pre_processor=convert_timestamp,
)
Comment on lines +256 to +263
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For datetime columns, this builds a DataFrame.query() expression by interpolating a datetime object directly (e.g., col < 2025-...), which is not valid syntax and will raise at runtime. Consider returning a callable/boolean mask for the new non-string filter path, or format/quote datetime bounds safely so the expression is valid.

Copilot uses AI. Check for mistakes.
max_bound = self.get_test_case_param_value(
self.test_case.parameterValues,
"maxValue",
type_=datetime.fromtimestamp,
default=datetime.max,
pre_processor=convert_timestamp,
)
else:
min_bound = self.get_min_bound("minValue")
max_bound = self.get_max_bound("maxValue")
filters = []
if min_bound is not None:
filters.append(f"{column.name} < {min_bound}")
if max_bound is not None:
filters.append(f"{column.name} > {max_bound}")
return " or ".join(filters)

def fetch_failed_rows_sample(self):
cols, rows = self._get_failed_rows_sample()
return TableData(columns=cols, rows=rows)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Validator for column value to be in set test case
"""

from ast import literal_eval
from collections import defaultdict
from typing import List, Optional, cast

Expand All @@ -27,10 +28,17 @@
BaseColumnValuesToBeInSetValidator,
)
from metadata.data_quality.validations.impact_score import calculate_impact_score_pandas
from metadata.data_quality.validations.mixins.failed_row_sampler_mixin import (
PandasFailedRowSamplerMixin,
)
from metadata.data_quality.validations.mixins.failed_sample_validator_mixin import (
FailedSampleValidatorMixin,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_pandas,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.core import add_props
from metadata.profiler.metrics.registry import Metrics
Expand All @@ -41,7 +49,10 @@


class ColumnValuesToBeInSetValidator(
BaseColumnValuesToBeInSetValidator, PandasValidatorMixin
FailedSampleValidatorMixin,
BaseColumnValuesToBeInSetValidator,
PandasValidatorMixin,
PandasFailedRowSamplerMixin,
):
"""Validator for column value to be in set test case"""

Expand Down Expand Up @@ -196,3 +207,15 @@ def compute_row_count(self, column: SQALikeColumn):
NotImplementedError:
"""
return self._compute_row_count(self.runner, column)

def filter(self):
items = self.get_test_case_param_value(
self.test_case.parameterValues,
"allowedValues",
literal_eval,
)
return f"~{self.get_column().name}.isin({items})"

def fetch_failed_rows_sample(self):
cols, rows = self._get_failed_rows_sample()
return TableData(columns=cols, rows=rows)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Validator for column value to be not in set test case
"""

from ast import literal_eval
from collections import defaultdict
from typing import List, Optional, cast

Expand All @@ -27,10 +28,17 @@
BaseColumnValuesToBeNotInSetValidator,
)
from metadata.data_quality.validations.impact_score import calculate_impact_score_pandas
from metadata.data_quality.validations.mixins.failed_row_sampler_mixin import (
PandasFailedRowSamplerMixin,
)
from metadata.data_quality.validations.mixins.failed_sample_validator_mixin import (
FailedSampleValidatorMixin,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_pandas,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.core import add_props
from metadata.profiler.metrics.registry import Metrics
Expand All @@ -41,7 +49,10 @@


class ColumnValuesToBeNotInSetValidator(
BaseColumnValuesToBeNotInSetValidator, PandasValidatorMixin
FailedSampleValidatorMixin,
BaseColumnValuesToBeNotInSetValidator,
PandasValidatorMixin,
PandasFailedRowSamplerMixin,
):
"""Validator for column value to be not in set test case"""

Expand Down Expand Up @@ -182,3 +193,15 @@ def compute_row_count(self, column: SQALikeColumn):
NotImplementedError:
"""
return self._compute_row_count(self.runner, column)

def filter(self):
items = self.get_test_case_param_value(
self.test_case.parameterValues,
"forbiddenValues",
literal_eval,
)
return f"{self.get_column().name}.isin({items})"

def fetch_failed_rows_sample(self):
cols, rows = self._get_failed_rows_sample()
return TableData(columns=cols, rows=rows)
Loading
Loading