Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ingestion/src/metadata/data_quality/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from metadata.config.common import ConfigModel
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.table import Table, TableData
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
Expand Down Expand Up @@ -51,6 +51,9 @@ class TestSuiteProcessorConfig(ConfigModel):
class TestCaseResultResponse(BaseModel):
testCaseResult: TestCaseResult
testCase: TestCase
failedRowsSample: Optional[TableData] = None
inspectionQuery: Optional[str] = None
validateColumns: bool = True


class TableAndTests(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from abc import ABC, abstractmethod
from typing import Optional, Set, Type

from metadata.data_quality.api.models import TestCaseResultResponse
from metadata.data_quality.builders.validator_builder import ValidatorBuilder
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.data_quality.validations.runtime_param_setter.param_setter import (
Expand All @@ -27,7 +28,7 @@
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.basic import TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.ingestion.ometa.ometa_api import OpenMetadata
Expand Down Expand Up @@ -108,7 +109,7 @@ def _set_runtime_params_setter_fact(
"""
cls.runtime_params_setter_fact = class_fact

def run_test_case(self, test_case: TestCase) -> Optional[TestCaseResult]:
def run_test_case(self, test_case: TestCase) -> Optional[TestCaseResultResponse]:
"""run column data quality tests"""
runtime_params_setter_fact: RuntimeParameterSetterFactory = (
self._get_runtime_params_setter_fact()
Expand Down Expand Up @@ -138,11 +139,14 @@ def run_test_case(self, test_case: TestCase) -> Optional[TestCaseResult]:
f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}"
)
logger.exception(message)
return validator.get_test_case_result_object(
validator.execution_date,
TestCaseStatus.Aborted,
message,
[],
return TestCaseResultResponse(
testCase=test_case,
testCaseResult=validator.get_test_case_result_object(
validator.execution_date,
TestCaseStatus.Aborted,
message,
[],
),
)

def _get_table_config(self):
Expand Down
9 changes: 2 additions & 7 deletions ingestion/src/metadata/data_quality/runner/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""


from metadata.data_quality.api.models import TestCaseResultResponse
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.generated.schema.tests.testCase import TestCase
from metadata.utils.logger import test_suite_logger
Expand All @@ -34,12 +33,8 @@ def run_and_handle(self, test_case: TestCase):
f"Executing test case {test_case.name.root} "
f"for entity {self.test_runner_interface.table_entity.fullyQualifiedName.root}"
)
test_result = self.test_runner_interface.run_test_case(
result = self.test_runner_interface.run_test_case(
test_case,
)

if test_result:
return TestCaseResultResponse(
testCaseResult=test_result, testCase=test_case
)
return None
return result
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from pydantic import BaseModel

from metadata.data_quality.api.models import TestCaseResultResponse
from metadata.data_quality.validations import utils
from metadata.data_quality.validations.impact_score import (
DEFAULT_TOP_DIMENSIONS,
Expand Down Expand Up @@ -134,7 +135,7 @@ def _get_top_dimensions(self) -> int:
return DEFAULT_TOP_DIMENSIONS
return min(value, MAX_TOP_DIMENSIONS)

def run_validation(self) -> TestCaseResult:
def run_validation(self) -> TestCaseResultResponse:
"""Template method defining the validation flow with optional dimensional analysis
This method orchestrates the overall validation process:
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_validation() is now typed to return TestCaseResultResponse, but there is still an early return in this method that returns test_result (a TestCaseResult) when dimension columns are invalid. That creates an inconsistent return type and will break callers expecting the response wrapper; wrap the early-return path in TestCaseResultResponse as well.

Copilot uses AI. Check for mistakes.
Comment on lines 135 to 141
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing BaseTestValidator.run_validation() to return TestCaseResultResponse is a breaking API change for other callers that still expect a TestCaseResult (e.g., ingestion/src/metadata/sdk/data_quality/dataframes/dataframe_validation_engine.py calls validator.run_validation() and treats the return as TestCaseResult). Please update those callers (or provide a backward-compatible API) to avoid runtime type errors.

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -186,7 +187,20 @@ def run_validation(self) -> TestCaseResult:
)
logger.debug(traceback.format_exc())

return test_result
result: TestCaseResultResponse = TestCaseResultResponse(
testCaseResult=test_result, testCase=self.test_case
)

self.result_with_failed_samples(result)

return result
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that run_validation() returns TestCaseResultResponse, please ensure all code paths return a TestCaseResultResponse. There is still an early-return path earlier in the method that returns the raw TestCaseResult (e.g., when dimension columns are invalid), which will break callers expecting testCase/failedRowsSample on the response.

Copilot uses AI. Check for mistakes.

def result_with_failed_samples(self, result: TestCaseResultResponse) -> None:
"""Hook for failed row sampling. No-op by default.
Overridden by FailedSampleValidatorMixin to fetch and stash
failed row samples on the validator instance.
"""

@abstractmethod
def _run_validation(self) -> TestCaseResult:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@
BaseColumnValueLengthsToBeBetweenValidator,
)
from metadata.data_quality.validations.impact_score import calculate_impact_score_pandas
from metadata.data_quality.validations.mixins.failed_row_sampler_mixin import (
PandasFailedRowSamplerMixin,
)
from metadata.data_quality.validations.mixins.failed_sample_validator_mixin import (
FailedSampleValidatorMixin,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_statistical_pandas,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.registry import Metrics
from metadata.utils.logger import test_suite_logger
Expand All @@ -41,7 +48,10 @@


class ColumnValueLengthsToBeBetweenValidator(
BaseColumnValueLengthsToBeBetweenValidator, PandasValidatorMixin
FailedSampleValidatorMixin,
BaseColumnValueLengthsToBeBetweenValidator,
PandasValidatorMixin,
PandasFailedRowSamplerMixin,
):
"""Validator for column value lengths to be between test case"""

Expand Down Expand Up @@ -238,3 +248,21 @@ def compute_row_count(self, column: SQALikeColumn, min_bound: int, max_bound: in
)

return row_count, failed_rows

def filter(self):
min_bound = self.get_min_bound("minLength")
max_bound = self.get_max_bound("maxLength")
filters = []
if min_bound is not None and min_bound > float("-inf"):
filters.append(
f"{self.get_column().name}.astype('str').str.len() < {min_bound}"
)
if max_bound is not None and max_bound < float("inf"):
filters.append(
f"{self.get_column().name}.astype('str').str.len() > {max_bound}"
)
return " or ".join(filters)

def fetch_failed_rows_sample(self):
cols, rows = self._get_failed_rows_sample()
return TableData(columns=cols, rows=rows)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""

from collections import defaultdict
from datetime import datetime
from typing import List, Optional, cast

import pandas as pd
Expand All @@ -27,20 +28,32 @@
BaseColumnValuesToBeBetweenValidator,
)
from metadata.data_quality.validations.impact_score import calculate_impact_score_pandas
from metadata.data_quality.validations.mixins.failed_row_sampler_mixin import (
PandasFailedRowSamplerMixin,
)
from metadata.data_quality.validations.mixins.failed_sample_validator_mixin import (
FailedSampleValidatorMixin,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_statistical_pandas,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.orm.registry import is_date_time
from metadata.utils.logger import test_suite_logger
from metadata.utils.sqa_like_column import SQALikeColumn
from metadata.utils.time_utils import convert_timestamp

logger = test_suite_logger()


class ColumnValuesToBeBetweenValidator(
BaseColumnValuesToBeBetweenValidator, PandasValidatorMixin
FailedSampleValidatorMixin,
BaseColumnValuesToBeBetweenValidator,
PandasValidatorMixin,
PandasFailedRowSamplerMixin,
):
"""Validator for column values to be between test case"""

Expand Down Expand Up @@ -237,3 +250,34 @@ def compute_row_count(self, column: SQALikeColumn, min_bound: int, max_bound: in
)

return row_count, failed_rows

def filter(self):
column = self.get_column()
if is_date_time(column.type):
min_bound = self.get_test_case_param_value(
self.test_case.parameterValues,
"minValue",
type_=datetime.fromtimestamp,
default=datetime.min,
pre_processor=convert_timestamp,
)
Comment on lines +256 to +263
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For datetime columns, this builds a DataFrame.query() expression by interpolating a datetime object directly (e.g., col < 2025-...), which is not valid syntax and will raise at runtime. Consider returning a callable/boolean mask for the new non-string filter path, or format/quote datetime bounds safely so the expression is valid.

Copilot uses AI. Check for mistakes.
max_bound = self.get_test_case_param_value(
self.test_case.parameterValues,
"maxValue",
type_=datetime.fromtimestamp,
default=datetime.max,
pre_processor=convert_timestamp,
)
else:
min_bound = self.get_min_bound("minValue")
max_bound = self.get_max_bound("maxValue")
filters = []
if min_bound is not None:
filters.append(f"{column.name} < {min_bound}")
if max_bound is not None:
filters.append(f"{column.name} > {max_bound}")
return " or ".join(filters)

def fetch_failed_rows_sample(self):
cols, rows = self._get_failed_rows_sample()
return TableData(columns=cols, rows=rows)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Validator for column value to be in set test case
"""

from ast import literal_eval
from collections import defaultdict
from typing import List, Optional, cast

Expand All @@ -27,10 +28,17 @@
BaseColumnValuesToBeInSetValidator,
)
from metadata.data_quality.validations.impact_score import calculate_impact_score_pandas
from metadata.data_quality.validations.mixins.failed_row_sampler_mixin import (
PandasFailedRowSamplerMixin,
)
from metadata.data_quality.validations.mixins.failed_sample_validator_mixin import (
FailedSampleValidatorMixin,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_pandas,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.core import add_props
from metadata.profiler.metrics.registry import Metrics
Expand All @@ -41,7 +49,10 @@


class ColumnValuesToBeInSetValidator(
BaseColumnValuesToBeInSetValidator, PandasValidatorMixin
FailedSampleValidatorMixin,
BaseColumnValuesToBeInSetValidator,
PandasValidatorMixin,
PandasFailedRowSamplerMixin,
):
"""Validator for column value to be in set test case"""

Expand Down Expand Up @@ -196,3 +207,15 @@ def compute_row_count(self, column: SQALikeColumn):
NotImplementedError:
"""
return self._compute_row_count(self.runner, column)

def filter(self):
items = self.get_test_case_param_value(
self.test_case.parameterValues,
"allowedValues",
literal_eval,
)
return f"~{self.get_column().name}.isin({items})"

def fetch_failed_rows_sample(self):
cols, rows = self._get_failed_rows_sample()
return TableData(columns=cols, rows=rows)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Validator for column value to be not in set test case
"""

from ast import literal_eval
from collections import defaultdict
from typing import List, Optional, cast

Expand All @@ -27,10 +28,17 @@
BaseColumnValuesToBeNotInSetValidator,
)
from metadata.data_quality.validations.impact_score import calculate_impact_score_pandas
from metadata.data_quality.validations.mixins.failed_row_sampler_mixin import (
PandasFailedRowSamplerMixin,
)
from metadata.data_quality.validations.mixins.failed_sample_validator_mixin import (
FailedSampleValidatorMixin,
)
from metadata.data_quality.validations.mixins.pandas_validator_mixin import (
PandasValidatorMixin,
aggregate_others_pandas,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.dimensionResult import DimensionResult
from metadata.profiler.metrics.core import add_props
from metadata.profiler.metrics.registry import Metrics
Expand All @@ -41,7 +49,10 @@


class ColumnValuesToBeNotInSetValidator(
BaseColumnValuesToBeNotInSetValidator, PandasValidatorMixin
FailedSampleValidatorMixin,
BaseColumnValuesToBeNotInSetValidator,
PandasValidatorMixin,
PandasFailedRowSamplerMixin,
):
"""Validator for column value to be not in set test case"""

Expand Down Expand Up @@ -182,3 +193,15 @@ def compute_row_count(self, column: SQALikeColumn):
NotImplementedError:
"""
return self._compute_row_count(self.runner, column)

def filter(self):
items = self.get_test_case_param_value(
self.test_case.parameterValues,
"forbiddenValues",
literal_eval,
)
return f"{self.get_column().name}.isin({items})"

def fetch_failed_rows_sample(self):
cols, rows = self._get_failed_rows_sample()
return TableData(columns=cols, rows=rows)
Loading
Loading