diff --git a/presidio-analyzer/presidio_analyzer/analyzer_engine_provider.py b/presidio-analyzer/presidio_analyzer/analyzer_engine_provider.py index c198e9000d..1ca9c4665e 100644 --- a/presidio-analyzer/presidio_analyzer/analyzer_engine_provider.py +++ b/presidio-analyzer/presidio_analyzer/analyzer_engine_provider.py @@ -5,6 +5,7 @@ import yaml from presidio_analyzer import AnalyzerEngine, RecognizerRegistry +from presidio_analyzer.input_validation import ConfigurationValidator from presidio_analyzer.nlp_engine import NlpEngine, NlpEngineProvider from presidio_analyzer.recognizer_registry import RecognizerRegistryProvider @@ -29,6 +30,13 @@ def __init__( nlp_engine_conf_file: Optional[Union[Path, str]] = None, recognizer_registry_conf_file: Optional[Union[Path, str]] = None, ): + if analyzer_engine_conf_file: + ConfigurationValidator.validate_file_path(analyzer_engine_conf_file) + if nlp_engine_conf_file: + ConfigurationValidator.validate_file_path(nlp_engine_conf_file) + if recognizer_registry_conf_file: + ConfigurationValidator.validate_file_path(recognizer_registry_conf_file) + self.configuration = self.get_configuration(conf_file=analyzer_engine_conf_file) self.nlp_engine_conf_file = nlp_engine_conf_file self.recognizer_registry_conf_file = recognizer_registry_conf_file @@ -36,7 +44,7 @@ def __init__( def get_configuration( self, conf_file: Optional[Union[Path, str]] ) -> Union[Dict[str, Any]]: - """Retrieve the analyzer engine configuration from the provided file.""" + """Retrieve analyzer engine configuration from the provided file.""" if not conf_file: default_conf_file = self._get_full_conf_path() @@ -59,10 +67,15 @@ def get_configuration( with open(self._get_full_conf_path()) as file: configuration = yaml.safe_load(file) except Exception: - print(f"Failed to parse file {conf_file}, resorting to default") + logger.warning( + f"Failed to parse file {conf_file}, resorting to default" + ) with open(self._get_full_conf_path()) as file: configuration = yaml.safe_load(file) + ConfigurationValidator.validate_analyzer_configuration(configuration) + logger.debug("Analyzer configuration validation passed") + return configuration def create_engine(self) -> AnalyzerEngine: diff --git a/presidio-analyzer/presidio_analyzer/analyzer_request.py b/presidio-analyzer/presidio_analyzer/analyzer_request.py index 87574c7e7b..669d8bb822 100644 --- a/presidio-analyzer/presidio_analyzer/analyzer_request.py +++ b/presidio-analyzer/presidio_analyzer/analyzer_request.py @@ -37,5 +37,6 @@ def __init__(self, req_data: Dict): self.context = req_data.get("context") self.allow_list = req_data.get("allow_list") self.allow_list_match = req_data.get("allow_list_match", "exact") - self.regex_flags = req_data.get("regex_flags", - re.DOTALL | re.MULTILINE | re.IGNORECASE) + self.regex_flags = req_data.get( + "regex_flags", re.DOTALL | re.MULTILINE | re.IGNORECASE + ) diff --git a/presidio-analyzer/presidio_analyzer/conf/default_analyzer_full.yaml b/presidio-analyzer/presidio_analyzer/conf/default_analyzer_full.yaml new file mode 100644 index 0000000000..a3603e3947 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/conf/default_analyzer_full.yaml @@ -0,0 +1,154 @@ +supported_languages: + - en +default_score_threshold: 0 +nlp_configuration: + nlp_engine_name: spacy + models: + - lang_code: en + model_name: en_core_web_lg + + ner_model_configuration: + model_to_presidio_entity_mapping: + PER: PERSON + PERSON: PERSON + NORP: NRP + FAC: LOCATION + LOC: LOCATION + LOCATION: LOCATION + GPE: LOCATION + ORG: ORGANIZATION + ORGANIZATION: ORGANIZATION + DATE: DATE_TIME + TIME: DATE_TIME + + low_confidence_score_multiplier: 0.4 + low_score_entity_names: + - + labels_to_ignore: + - ORG + - ORGANIZATION # has many false positives + - CARDINAL + - EVENT + - LANGUAGE + - LAW + - MONEY + - ORDINAL + - PERCENT + - PRODUCT + - QUANTITY + - WORK_OF_ART + + +recognizer_registry: + # global_regex_flags: 26 + recognizers: + # Recognizers listed here can either be loaded from the recognizers defined in code (type: predefined), + # or created based on the provided configuration (type: custom). + # For predefined: + # - If only a recognizer name is provided, a predefined recognizer with this name and default parameters will be loaded. + # - If a parameter isn't provided, the default one would be loaded. + # For custom: + # - See an example configuration here: https://github.com/microsoft/presidio/blob/main/presidio-analyzer/presidio_analyzer/conf/example_recognizers.yaml + # - Custom pattern recognizers with this configuration can be added to this file, with type: custom + # For recognizers supporting more than one language, an instance of the recognizer for each language will be created. + # For example, see the CreditCardRecognizer definition below: + - name: CreditCardRecognizer + supported_languages: + - language: en + context: [credit, card, visa, mastercard, cc, amex, discover, jcb, diners, maestro, instapayment] + type: predefined + + - name: UsBankRecognizer + type: predefined + + - name: UsLicenseRecognizer + type: predefined + + - name: UsItinRecognizer + type: predefined + + - name: UsPassportRecognizer + type: predefined + + - name: UsSsnRecognizer + type: predefined + + - name: NhsRecognizer + type: predefined + + - name: UkNinoRecognizer + type: predefined + enabled: false + + - name: SgFinRecognizer + type: predefined + enabled: false + + - name: AuAbnRecognizer + type: predefined + enabled: false + + - name: AuAcnRecognizer + type: predefined + enabled: false + + - name: AuTfnRecognizer + type: predefined + enabled: false + + - name: AuMedicareRecognizer + type: predefined + enabled: false + + - name: InPanRecognizer + type: predefined + enabled: false + + - name: InAadhaarRecognizer + supported_languages: + - en + type: predefined + enabled: false + + - name: InVehicleRegistrationRecognizer + type: predefined + enabled: false + + - name: InPassportRecognizer + type: predefined + enabled: false + + - name: CryptoRecognizer + type: predefined + + - name: DateRecognizer + type: predefined + + - name: EmailRecognizer + type: predefined + + - name: IbanRecognizer + type: predefined + + - name: IpRecognizer + type: predefined + + - name: MedicalLicenseRecognizer + type: predefined + + - name: PhoneRecognizer + type: predefined + + - name: UrlRecognizer + type: predefined + + - name: InVoterRecognizer + type: predefined + enabled: false + + - name: InGstinRecognizer + type: predefined + enabled: false + + - name: SpacyRecognizer + type: predefined diff --git a/presidio-analyzer/presidio_analyzer/context_aware_enhancers/__init__.py b/presidio-analyzer/presidio_analyzer/context_aware_enhancers/__init__.py index a6667dc57d..fbcfdf91bc 100644 --- a/presidio-analyzer/presidio_analyzer/context_aware_enhancers/__init__.py +++ b/presidio-analyzer/presidio_analyzer/context_aware_enhancers/__init__.py @@ -1,4 +1,5 @@ """Context awareness modules.""" + from .context_aware_enhancer import ContextAwareEnhancer from .lemma_context_aware_enhancer import LemmaContextAwareEnhancer diff --git a/presidio-analyzer/presidio_analyzer/entity_recognizer.py b/presidio-analyzer/presidio_analyzer/entity_recognizer.py index 07fccc0727..9f05e9b6e8 100644 --- a/presidio-analyzer/presidio_analyzer/entity_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/entity_recognizer.py @@ -1,9 +1,11 @@ import logging from abc import abstractmethod -from typing import Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from presidio_analyzer import RecognizerResult -from presidio_analyzer.nlp_engine import NlpArtifacts + +if TYPE_CHECKING: + from presidio_analyzer.nlp_engine import NlpArtifacts logger = logging.getLogger("presidio-analyzer") @@ -74,7 +76,7 @@ def load(self) -> None: @abstractmethod def analyze( - self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts + self, text: str, entities: List[str], nlp_artifacts: "NlpArtifacts" ) -> List[RecognizerResult]: """ Analyze text to identify entities. @@ -92,7 +94,7 @@ def enhance_using_context( text: str, raw_recognizer_results: List[RecognizerResult], other_raw_recognizer_results: List[RecognizerResult], - nlp_artifacts: NlpArtifacts, + nlp_artifacts: "NlpArtifacts", context: Optional[List[str]] = None, ) -> List[RecognizerResult]: """Enhance confidence score using context of the entity. diff --git a/presidio-analyzer/presidio_analyzer/input_validation/__init__.py b/presidio-analyzer/presidio_analyzer/input_validation/__init__.py new file mode 100644 index 0000000000..aadea64675 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/input_validation/__init__.py @@ -0,0 +1,21 @@ +"""Configuration validation module for Presidio.""" + +from .language_validation import validate_language_codes +from .schemas import ConfigurationValidator +from .yaml_recognizer_models import ( + BaseRecognizerConfig, + CustomRecognizerConfig, + LanguageContextConfig, + PredefinedRecognizerConfig, + RecognizerRegistryConfig, +) + +__all__ = [ + "validate_language_codes", + "ConfigurationValidator", + "BaseRecognizerConfig", + "CustomRecognizerConfig", + "LanguageContextConfig", + "PredefinedRecognizerConfig", + "RecognizerRegistryConfig", +] diff --git a/presidio-analyzer/presidio_analyzer/input_validation/language_validation.py b/presidio-analyzer/presidio_analyzer/input_validation/language_validation.py new file mode 100644 index 0000000000..c5f0171624 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/input_validation/language_validation.py @@ -0,0 +1,18 @@ +from typing import List + +import regex as re + + +def validate_language_codes(languages: List[str]) -> None: + """Validate language codes format. + + :param languages: List of languages to validate. + """ + language_code_regex = re.compile(r"^[a-z]{2}(-[A-Z]{2})?$") + + for lang in languages: + if not re.match(language_code_regex, lang): + raise ValueError( + f"Invalid language code format: {lang}. " + f"Expected format: 'en' or 'en-US'" + ) diff --git a/presidio-analyzer/presidio_analyzer/input_validation/schemas.py b/presidio-analyzer/presidio_analyzer/input_validation/schemas.py new file mode 100644 index 0000000000..a256ca2a58 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/input_validation/schemas.py @@ -0,0 +1,132 @@ +from pathlib import Path +from typing import Any, Dict, List, Union + +from pydantic import ValidationError + +from . import validate_language_codes +from .yaml_recognizer_models import RecognizerRegistryConfig + + +class ConfigurationValidator: + """Class for validating configurations using Pydantic-enabled classes.""" + + @staticmethod + def validate_language_codes(languages: List[str]) -> List[str]: + """Validate language codes format. + + :param languages: List of languages to validate. + """ + validate_language_codes(languages) + return languages + + @staticmethod + def validate_file_path(file_path: Union[str, Path]) -> Path: + """Validate file path exists and is readable. + + :param file_path: Path to validate. + """ + path = Path(file_path) + if not path.exists(): + raise ValueError(f"Configuration file does not exist: {path}") + if not path.is_file(): + raise ValueError(f"Path is not a file: {path}") + return path + + @staticmethod + def validate_score_threshold(threshold: float) -> float: + """Validate score threshold is within valid range. + + :param threshold: score threshold to validate. + """ + if not 0.0 <= threshold <= 1.0: + raise ValueError( + f"Score threshold must be between 0.0 and 1.0, got: {threshold}" + ) + return threshold + + @staticmethod + def validate_nlp_configuration(config: Dict[str, Any]) -> Dict[str, Any]: + """Validate NLP configuration structure. + + :param config: NLP Configuration to validate. + """ + if not isinstance(config, dict): + raise ValueError("NLP configuration must be a dictionary") + + required_fields = ["nlp_engine_name", "models"] + missing_fields = [field for field in required_fields if field not in config] + if missing_fields: + raise ValueError( + f"NLP configuration missing required fields: {missing_fields}" + ) + + # Validate models structure + if not isinstance(config["models"], list) or not config["models"]: + raise ValueError("Models must be a non-empty list") + + for model in config["models"]: + if not isinstance(model, dict): + raise ValueError("Each model must be a dictionary") + if "lang_code" not in model or "model_name" not in model: + raise ValueError("Each model must have 'lang_code' and 'model_name'") + + return config + + @staticmethod + def validate_recognizer_registry_configuration( + config: Dict[str, Any], + ) -> Dict[str, Any]: + """Validate recognizer registry configuration using Pydantic models.""" + try: + # Use Pydantic model for validation + validated_config = RecognizerRegistryConfig(**config) + # Use model_dump() without exclude_unset to include default values + return validated_config.model_dump(exclude_unset=False) + except ValidationError as e: + raise ValueError("Invalid recognizer registry configuration") from e + + @staticmethod + def validate_analyzer_configuration(config: Dict[str, Any]) -> Dict[str, Any]: + """Validate analyzer engine configuration.""" + if not isinstance(config, dict): + raise ValueError("Analyzer configuration must be a dictionary") + + # Define valid top-level keys for analyzer configuration + valid_keys = { + "supported_languages", + "default_score_threshold", + "nlp_configuration", + "recognizer_registry", + } + + # Check for unknown keys + unknown_keys = set(config.keys()) - valid_keys + if unknown_keys: + raise ValueError( + f"Unknown configuration key(s) in " + f"analyzer configuration: {sorted(unknown_keys)}. " + f"Valid keys are: {sorted(valid_keys)}" + ) + + # Validate supported languages if present + if "supported_languages" in config: + validate_language_codes(config["supported_languages"]) + + # Validate score threshold if present + if "default_score_threshold" in config: + ConfigurationValidator.validate_score_threshold( + config["default_score_threshold"] + ) + + # Validate nested configurations + if "nlp_configuration" in config: + ConfigurationValidator.validate_nlp_configuration( + config["nlp_configuration"] + ) + + if "recognizer_registry" in config: + ConfigurationValidator.validate_recognizer_registry_configuration( + config["recognizer_registry"] + ) + + return config diff --git a/presidio-analyzer/presidio_analyzer/input_validation/yaml_recognizer_models.py b/presidio-analyzer/presidio_analyzer/input_validation/yaml_recognizer_models.py new file mode 100644 index 0000000000..ea7c2c81f4 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/input_validation/yaml_recognizer_models.py @@ -0,0 +1,423 @@ +"""Pydantic models for YAML recognizer configurations.""" + +from typing import Any, Dict, List, Optional, Union + +from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator + +from presidio_analyzer.input_validation import validate_language_codes +from presidio_analyzer.recognizer_registry.recognizers_loader_utils import ( + PredefinedRecognizerNotFoundError, + RecognizerListLoader, +) + + +class LanguageContextConfig(BaseModel): + """Configuration for language-specific validation with context words. + + :param language: Language code (e.g., 'en', 'es') + :param context: Context words for this language + """ + + language: str = Field(..., description="Language code (e.g., 'en', 'es')") + context: Optional[List[str]] = Field( + default=None, description="Context words for this language" + ) + + @field_validator("language") + @classmethod + def validate_language_code(cls, v: str) -> str: + """Validate language code format.""" + validate_language_codes([v]) + return v + + +class BaseRecognizerConfig(BaseModel): + """Base validation for all recognizer configuration types. + + :param name: Name of the recognizer + :param enabled: Whether the recognizer is enabled + :param type: Type of recognizer (predefined/custom) + :param supported_language: Single supported language (legacy) + :param supported_languages: Multiple supported languages with optional context. + Passing multiple languages will result in multiple actual + recognizers initialized in Presidio. + :param context: context words. Context is best defined + in the language-specific configuration, + as it is language-dependent. If context is defined outside, + it should only work if the user passed one language + (either in supported_language or have a supported_languages with length 1). + :param supported_entity: Supported entity for this recognizer (legacy) + :param supported_entities: List of supported entities for this recognizer. + """ + + name: str = Field(..., description="Name of the recognizer") + enabled: bool = Field(default=True, description="Whether the recognizer is enabled") + type: Optional[str] = Field( + default="predefined", description="Type of recognizer (predefined/custom)" + ) + supported_language: Optional[str] = Field( + default=None, description="The language this recognizer supports" + ) + supported_languages: Optional[Union[List[str], List[LanguageContextConfig]]] = ( + Field( + default=None, + description="Multiple supported languages with optional context", + ) + ) + context: Optional[List[str]] = Field( + default=None, description="Global context words" + ) + supported_entity: Optional[str] = Field( + default=None, description="Supported entity for this recognizer" + ) + supported_entities: Optional[List[str]] = Field( + default=None, description="List of supported entities " "for this recognizer" + ) + + @field_validator("supported_language") + @classmethod + def validate_single_language(cls, v: Optional[str]) -> Optional[str]: + """Validate single language code format.""" + validate_language_codes([v]) + return v + + @model_validator(mode="after") + def validate_language_configuration(self): + """Ensure proper language validation.""" + if self.supported_language and self.supported_languages: + raise ValueError( + "Cannot specify both 'supported_language' and 'supported_languages'" + ) + + return self + + @model_validator(mode="after") + def validate_entity_configuration(self): + """Ensure proper entity validation.""" + # Check if user provided both (before we modify them) + user_provided_both = ( + self.supported_entity is not None and self.supported_entities is not None + ) + + if user_provided_both: + raise ValueError( + f"Recognizer {self.name} has both " + "'supported_entity' and 'supported_entities' specified." + ) + + return self + + @model_validator(mode="after") + def validate_context_configuration(self): + """Validate context configuration according to language settings.""" + # Check if global context is defined + if self.context: + # Global context is only valid if we have exactly one language + if self.supported_languages and len(self.supported_languages) > 1: + raise ValueError( + "Global context can only be used with a single language. " + "For multiple languages, define context in " + "language-specific configurations." + "Example: " + " supported_languages: " + " - language: en " + " context: [credit, card, visa, mastercard] " + " - language: es " + " context: [tarjeta, credito, visa, mastercard] " + ) + return self + + +class PredefinedRecognizerConfig(BaseRecognizerConfig): + """Configuration for predefined recognizers.""" + + type: str = Field(default="predefined", description="Type of recognizer") + + @model_validator(mode="after") + def validate_predefined_recognizer_exists(self): + """Validate that the predefined recognizer class actually exists.""" + try: + RecognizerListLoader.get_existing_recognizer_cls(self.name) + except PredefinedRecognizerNotFoundError as e: + raise ValueError( + f"Predefined recognizer '{self.name}' not found: {str(e)}" + ) from e + return self + + +class CustomRecognizerConfig(BaseRecognizerConfig): + """Configuration for custom pattern-based recognizers.""" + + type: str = Field(default="custom", description="Type of recognizer") + supported_entity: str = Field( + ..., description="Entity type this recognizer detects" + ) + patterns: Optional[List[Dict[str, Any]]] = Field( + default=None, description="List of patterns" + ) + context: Optional[List[str]] = Field( + default=None, description="Global context words" + ) + deny_list: Optional[List[str]] = Field( + default=None, description="Words to deny/exclude" + ) + deny_list_score: Optional[float] = Field( + default=0.0, ge=0.0, le=1.0, description="Deny list score" + ) + + # Language validation (legacy and new formats) + supported_language: Optional[str] = Field( + default=None, description="Single supported language (legacy)" + ) + supported_languages: Optional[Union[List[str], List[LanguageContextConfig]]] = ( + Field( + default=None, + description="Multiple supported languages with optional context", + ) + ) + + model_config = ConfigDict(arbitrary_types_allowed=True) + + @model_validator(mode="before") + @classmethod + def check_predefined_name_conflict(cls, data: Any) -> Any: + """Check if custom recognizer name conflicts with predefined recognizer. + + This validation runs BEFORE field validation to provide a clearer error message + when someone tries to use a predefined recognizer name for a custom recognizer. + """ + if isinstance(data, dict): + name = data.get("name") + if name: + try: + RecognizerListLoader.get_existing_recognizer_cls(name) + # If we reach here, the recognizer IS predefined, so raise an error + raise ValueError( + f"Recognizer '{name}' conflicts with a predefined " + f"recognizer. " + f"Custom recognizers cannot use the same name " + f"as predefined recognizers. " + f"Either use type: 'predefined' or choose a different name " + f"for your custom recognizer." + ) + except PredefinedRecognizerNotFoundError: + # Name is not a predefined recognizer, + # which is fine for custom recognizers + pass + return data + + @field_validator("patterns") + @classmethod + def validate_patterns(cls, patterns: Optional[List[Dict]]) -> Optional[List[Dict]]: + """Validate single language code format. + + :param patterns: List of patterns + """ + if patterns and not isinstance(patterns, list): + raise ValueError(f"Patterns should be a list: {patterns}") + + for pattern in patterns: + if not isinstance(pattern, dict): + raise ValueError(f"Pattern should be a dict: {pattern}") + if "name" not in pattern: + raise ValueError(f"Pattern should contain a name field: {pattern}") + if "regex" not in pattern: + raise ValueError(f"Pattern should contain a regex field: {pattern}") + if "score" not in pattern: + raise ValueError(f"Pattern should contain a score field: {pattern}") + if not isinstance(pattern["score"], (int, float)): + raise ValueError(f"Pattern score should be a float: {pattern}") + if not (0.0 <= pattern["score"] <= 1.0): + raise ValueError(f"Pattern score should be between 0 and 1: {pattern}") + return patterns + + @model_validator(mode="after") + def validate_patterns_or_deny_list(self): + """Ensure custom recognizer has at least patterns or deny_list.""" + if not self.patterns and not self.deny_list: + raise ValueError( + "Custom recognizer must have at least one " + "of 'patterns' or 'deny_list'" + ) + return self + + +class RecognizerRegistryConfig(BaseModel): + """Complete validation for the recognizer registry.""" + + supported_languages: Optional[List[str]] = Field( + default=None, description="List of supported languages" + ) + global_regex_flags: int = Field(default=26, description="Global regex flags") + recognizers: List[ + Union[PredefinedRecognizerConfig, CustomRecognizerConfig, str] + ] = Field(default_factory=list, description="List of recognizer configurations") + + model_config = ConfigDict(extra="forbid") + + @field_validator("supported_languages") + @classmethod + def validate_language_codes( + cls, languages: Optional[List[str]] + ) -> Optional[List[str]]: + """Validate language codes format.""" + + # Allow None or empty list for cases where languages will be inferred + if languages is None: + return None + + if len(languages) == 0: + return [] + + validate_language_codes(languages) + return languages + + @model_validator(mode="after") + def validate_languages_for_custom_recognizers(self): + """Validate that custom recognizers have language configuration.""" + # If we have custom recognizers, we need language configuration somewhere + custom_recognizers = [ + rec for rec in self.recognizers if isinstance(rec, CustomRecognizerConfig) + ] + for recognizer in custom_recognizers: + if not recognizer.supported_language and not recognizer.supported_languages: + # If no language config on recognizer, we need global languages + if not self.supported_languages: + raise ValueError( + f"Language configuration missing for custom recognizer " + f"'{recognizer.name}': " + "Either specify 'supported_languages' " + "on the recognizer or provide " + "global 'supported_languages' in the " + "registry configuration." + ) + + return self + + @model_validator(mode="after") + def validate_recognizers_not_empty(self): + """Ensure recognizers list is not empty after all defaults are applied.""" + if not self.recognizers: + raise ValueError( + "The 'recognizers' field must contain at least one recognizer. " + "Found an empty recognizers list." + ) + return self + + @field_validator("recognizers", mode="before") + @classmethod + def parse_recognizers( + cls, recognizers: List[Union[Dict[str, Any], str]] + ) -> List[BaseRecognizerConfig]: + """Parse recognizers from various input formats without duplication.""" + if recognizers is None: + raise ValueError( + "Configuration error: 'recognizers' is required. " + "Please provide a list of recognizers in the configuration." + ) + + if not isinstance(recognizers, list): + raise ValueError("Recognizers must be a list") + + if len(recognizers) == 0: + raise ValueError( + "The 'recognizers' field must contain at least one recognizer. " + "Found an empty recognizers list." + ) + + parsed_recognizers = [] + for recognizer in recognizers: + if isinstance(recognizer, str): + # Simple string recognizer name - treat as predefined + parsed_recognizers.append(recognizer) + continue + + if isinstance(recognizer, dict): + recognizer_type = recognizer.get("type") + + # Validate conflicting custom-only fields if explicitly predefined + if recognizer_type == "predefined" and ( + "patterns" in recognizer or "deny_list" in recognizer + ): + raise ValueError( + f"Recognizer '{recognizer.get('name')}' is marked " + f"as 'predefined' but contains 'patterns' or 'deny_list' " + f"which are only valid for custom recognizers. " + f"Either use type: 'custom' or remove these fields." + ) + + # Auto-detect type if not provided + if not recognizer_type: + if "patterns" in recognizer or "deny_list" in recognizer: + recognizer_type = "custom" + recognizer_name = recognizer.get("name") + if recognizer_name: + cls.__check_if_predefined(recognizer_name) + else: + recognizer_type = "predefined" + recognizer["type"] = recognizer_type + + # Final append based on resolved type (only once) + if recognizer_type == "predefined": + parsed_recognizers.append(PredefinedRecognizerConfig(**recognizer)) + elif recognizer_type == "custom": + parsed_recognizers.append(CustomRecognizerConfig(**recognizer)) + else: + raise ValueError( + f"Invalid recognizer type: {recognizer_type}. " + f"Must be 'predefined' or 'custom'." + ) + continue + + # Fallback: unrecognized structure, keep as-is + parsed_recognizers.append(recognizer) + + return parsed_recognizers + + @classmethod + def __check_if_predefined(cls, recognizer_name: Optional[Any]) -> None: + try: + RecognizerListLoader.get_existing_recognizer_cls(recognizer_name) + # If we reach here, it IS a predefined recognizer, so raise an error + raise ValueError( + f"Recognizer '{recognizer_name}' conflicts with a predefined " + f"recognizer. " + f"Custom recognizers cannot use the same name " + f"as predefined recognizers. " + f"Either use type: 'predefined' or choose a different name " + f"for your custom recognizer." + ) + except PredefinedRecognizerNotFoundError: + # Name is not a predefined recognizer, which is fine for custom recognizers + pass + + @model_validator(mode="after") + def validate_language_presence(self): + """Ensure custom recognizers define languages if no global languages are set.""" + if self.recognizers and ( + not self.supported_languages or len(self.supported_languages) == 0 + ): + any_language_defined = False + custom_without_language_present = False + for r in self.recognizers: + if isinstance(r, (PredefinedRecognizerConfig, CustomRecognizerConfig)): + # Track if any language is defined + if (r.supported_language and r.supported_language.strip()) or ( + r.supported_languages and len(r.supported_languages) > 0 + ): + any_language_defined = True + # Track custom recognizers lacking language info + if ( + isinstance(r, CustomRecognizerConfig) + and not r.supported_language + and not r.supported_languages + ): + custom_without_language_present = True + + if custom_without_language_present and not any_language_defined: + raise ValueError( + "Language configuration missing for custom recognizer(s): " + "provide 'supported_languages' at registry level " + "or specify languages for each custom recognizer." + ) + return self diff --git a/presidio-analyzer/presidio_analyzer/lm_recognizer.py b/presidio-analyzer/presidio_analyzer/lm_recognizer.py index 64adfdb044..7a79f0b163 100644 --- a/presidio-analyzer/presidio_analyzer/lm_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/lm_recognizer.py @@ -1,6 +1,6 @@ import logging from abc import ABC, abstractmethod -from typing import List, Optional +from typing import TYPE_CHECKING, List, Optional from presidio_analyzer import RecognizerResult, RemoteRecognizer from presidio_analyzer.llm_utils import ( @@ -12,7 +12,9 @@ skip_unmapped_entities, validate_result_positions, ) -from presidio_analyzer.nlp_engine import NlpArtifacts + +if TYPE_CHECKING: + from presidio_analyzer.nlp_engine import NlpArtifacts logger = logging.getLogger("presidio-analyzer") @@ -90,7 +92,7 @@ def analyze( self, text: str, entities: Optional[List[str]] = None, - nlp_artifacts: Optional[NlpArtifacts] = None + nlp_artifacts: Optional["NlpArtifacts"] = None ) -> List[RecognizerResult]: """Analyze text for PII/PHI using LLM.""" if not text or not text.strip(): diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/ner_model_configuration.py b/presidio-analyzer/presidio_analyzer/nlp_engine/ner_model_configuration.py index 54e55426e7..3f813f015e 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/ner_model_configuration.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/ner_model_configuration.py @@ -1,6 +1,7 @@ import logging -from dataclasses import dataclass -from typing import Collection, Dict, Optional, Type +from typing import Collection, Dict, Optional + +from pydantic import BaseModel, ConfigDict, Field, field_validator logger = logging.getLogger("presidio-analyzer") @@ -29,9 +30,8 @@ LOW_SCORE_ENTITY_NAMES = set() -@dataclass -class NerModelConfiguration: - """NER model configuration. +class NerModelConfiguration(BaseModel): + """NER model configuration using Pydantic validation. :param labels_to_ignore: List of labels to not return predictions for. :param aggregation_strategy: @@ -48,73 +48,81 @@ class NerModelConfiguration: Multiplier to the score given for low_score_entity_names. """ # noqa: E501 - labels_to_ignore: Optional[Collection[str]] = None - aggregation_strategy: Optional[str] = "max" - stride: Optional[int] = 14 - alignment_mode: Optional[str] = "expand" - default_score: Optional[float] = 0.85 - model_to_presidio_entity_mapping: Optional[Dict[str, str]] = None - low_score_entity_names: Optional[Collection] = None - low_confidence_score_multiplier: Optional[float] = 0.4 - - def __post_init__(self): - """Validate the configuration and set defaults.""" - if self.model_to_presidio_entity_mapping is None: - logger.warning( - "model_to_presidio_entity_mapping is missing from configuration, " - "using default" - ) - self.model_to_presidio_entity_mapping = MODEL_TO_PRESIDIO_ENTITY_MAPPING - if self.low_score_entity_names is None: - logger.warning( - "low_score_entity_names is missing from configuration, " "using default" - ) - self.low_score_entity_names = LOW_SCORE_ENTITY_NAMES - if self.labels_to_ignore is None: + labels_to_ignore: Optional[Collection[str]] = Field( + default_factory=list, description="List of labels to ignore" + ) + aggregation_strategy: Optional[str] = Field( + default="max", description="Token classification aggregation strategy" + ) + stride: Optional[int] = Field( + default=14, description="Stride for token classification" + ) + alignment_mode: Optional[str] = Field( + default="expand", description="Alignment mode for spaCy char spans" + ) + default_score: Optional[float] = Field( + default=0.85, ge=0.0, le=1.0, description="Default confidence score" + ) + model_to_presidio_entity_mapping: Optional[Dict[str, str]] = Field( + default_factory=lambda: MODEL_TO_PRESIDIO_ENTITY_MAPPING.copy(), + description="Mapping between model entities and Presidio entities", + ) + low_score_entity_names: Optional[Collection[str]] = Field( + default_factory=lambda: LOW_SCORE_ENTITY_NAMES.copy(), + description="Entity names with likely low detection accuracy", + ) + low_confidence_score_multiplier: Optional[float] = Field( + default=0.4, ge=0.0, description="Score multiplier for low confidence entities" + ) + + model_config = ConfigDict(arbitrary_types_allowed=True) + + @field_validator("aggregation_strategy") + @classmethod + def validate_aggregation_strategy(cls, agg_strategy: str) -> str: + """Validate aggregation strategy.""" + valid_strategies = ["simple", "first", "average", "max"] + if agg_strategy not in valid_strategies: logger.warning( - "labels_to_ignore is missing from configuration, " "using default" + f"Aggregation strategy '{agg_strategy}' might not be supported. " + f"Valid options: {valid_strategies}" ) - self.labels_to_ignore = {} + return agg_strategy + @field_validator("stride") @classmethod - def _validate_input(cls, ner_model_configuration_dict: Dict) -> None: - key_to_type = { - "labels_to_ignore": Collection, - "aggregation_strategy": str, - "alignment_mode": str, - "model_to_presidio_entity_mapping": dict, - "low_confidence_score_multiplier": float, - "low_score_entity_names": Collection, - "stride": int, - } - - for key, field_type in key_to_type.items(): - cls.__validate_type( - config_dict=ner_model_configuration_dict, key=key, field_type=field_type + def validate_stride(cls, stride: Optional[int]) -> int: + """Validate stride and handle None values.""" + if stride is None: + # Get the default value from the field definition + return cls.model_fields["stride"].default + return stride + + @field_validator("alignment_mode") + @classmethod + def validate_alignment_mode(cls, alignment: Optional[str]) -> str: + """Validate alignment mode and handle None values.""" + if alignment is None: + # Get the default value from the field definition + return cls.model_fields["alignment_mode"].default + valid_modes = ["strict", "contract", "expand"] + if alignment not in valid_modes: + logger.warning( + f"Alignment mode '{alignment}' might not be supported. " + f"Valid options: {valid_modes}" ) - - @staticmethod - def __validate_type(config_dict: Dict, key: str, field_type: Type) -> None: - if key in config_dict: - if not isinstance(config_dict[key], field_type): - raise ValueError(f"{key} must be of type {field_type}") + return alignment @classmethod - def from_dict(cls, nlp_engine_configuration: Dict) -> "NerModelConfiguration": - """Load NLP engine configuration from dict. - - :param nlp_engine_configuration: Dict with the configuration to load. + def from_dict(cls, ner_model_configuration_dict: Dict) -> "NerModelConfiguration": """ - cls._validate_input(nlp_engine_configuration) + Create NerModelConfiguration from a dictionary with Pydantic validation. - return cls(**nlp_engine_configuration) + :param ner_model_configuration_dict: Dictionary containing configuration + :return: NerModelConfiguration instance + """ + return cls(**ner_model_configuration_dict) def to_dict(self) -> Dict: - """Return the configuration as a dict.""" - return self.__dict__ - - def __str__(self) -> str: # noqa: D105 - return str(self.to_dict()) - - def __repr__(self) -> str: # noqa: D105 - return str(self) + """Convert to dictionary representation.""" + return self.model_dump(exclude_none=True) diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine_provider.py b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine_provider.py index 921c87190d..74713c6f14 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine_provider.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine_provider.py @@ -4,6 +4,7 @@ import yaml +from presidio_analyzer.input_validation import ConfigurationValidator from presidio_analyzer.nlp_engine import ( NerModelConfiguration, NlpEngine, @@ -38,9 +39,7 @@ def __init__( conf_file: Optional[Union[Path, str]] = None, nlp_configuration: Optional[Dict] = None, ): - if nlp_engines: - self._validate_nlp_engines(nlp_engines) - else: + if nlp_engines is None: nlp_engines = (SpacyNlpEngine, StanzaNlpEngine, TransformersNlpEngine) self.nlp_engines = { @@ -56,168 +55,60 @@ def __init__( ) if nlp_configuration: - self._validate_nlp_configuration(nlp_configuration) + ConfigurationValidator.validate_nlp_configuration(nlp_configuration) self.nlp_configuration = nlp_configuration - if conf_file or conf_file == '': - self._validate_conf_file_path(conf_file) + if conf_file or conf_file == "": + if conf_file == "": + raise ValueError("conf_file is empty") + ConfigurationValidator.validate_file_path(conf_file) self.nlp_configuration = self._read_nlp_conf(conf_file) if conf_file is None and nlp_configuration is None: conf_file = self._get_full_conf_path() logger.debug(f"Reading default conf file from {conf_file}") self.nlp_configuration = self._read_nlp_conf(conf_file) + ConfigurationValidator.validate_nlp_configuration(self.nlp_configuration) @staticmethod - def _validate_nlp_engines(nlp_engines: Tuple) -> None: - """ - Validate that all NLP engine classes have the required attributes. - - :param nlp_engines: Tuple of NLP engine classes to validate. - """ - - if not isinstance(nlp_engines, tuple): - raise ValueError(f"nlp_engines must be a tuple, got {type(nlp_engines)}") - - required_attributes = ['engine_name', 'is_available'] - - for engine_class in nlp_engines: - missing_attributes = [] - - for attr in required_attributes: - if not hasattr(engine_class, attr): - missing_attributes.append(attr) - - if missing_attributes: - raise ValueError( - f"NLP engine class {engine_class} is missing required " - f"class attributes: {missing_attributes}. " - "All NLP engine classes must have 'engine_name' and 'is_available' " - "as class attributes." - ) - - if not isinstance(engine_class.engine_name, str): - raise ValueError( - f"NLP engine class {engine_class} has invalid " - f"'engine_name' attribute. Expected string, " - f"got {type(engine_class.engine_name)}." - ) - - if not isinstance(engine_class.is_available, bool): - raise ValueError( - f"NLP engine class {engine_class} has invalid " - f"'is_available' attribute. Expected boolean, " - f"got {type(engine_class.is_available)}." - ) - - @staticmethod - def _validate_nlp_configuration(nlp_configuration: Dict) -> None: - """ - Validate the NLP configuration structure and content. - - :param nlp_configuration: The configuration dictionary to validate - """ - if not isinstance(nlp_configuration, Dict): - raise ValueError(f"nlp_configuration must be a dictionary, " - f"got {type(nlp_configuration)}") - - required_fields = ['nlp_engine_name', 'models'] - missing_fields = [] - - for field in required_fields: - if field not in nlp_configuration.keys(): - missing_fields.append(field) + def _read_nlp_conf(conf_file: Union[Path, str]) -> Dict: + """Read NLP configuration from a YAML file.""" + with open(conf_file) as file: + return yaml.safe_load(file) - if missing_fields: - raise ValueError( - f"nlp_configuration is missing required fields: {missing_fields}. " - f"Required fields are: {required_fields}" - ) @staticmethod - def _validate_conf_file_path(conf_file: Union[Path, str]) -> None: - """ - Validate the conf file path. - - :param conf_file: The conf file path to validate - """ - - if conf_file == '': - raise ValueError("conf_file is empty") - - if not isinstance(conf_file, (Path, str)): - raise ValueError(f"conf_file must be a string or Path, " - f"got {type(conf_file)}") - - if not Path(conf_file).exists(): - raise ValueError(f"conf_file {conf_file} does not exist") - - if Path(conf_file).is_dir(): - raise ValueError(f"conf_file {conf_file} is a directory, not a file") + def _get_full_conf_path( + default_conf_file: Union[Path, str] = "default.yaml" + ) -> Path: + """Return a Path to the default conf file.""" + return Path(Path(__file__).parent, "../conf", default_conf_file) def create_engine(self) -> NlpEngine: """Create an NLP engine instance.""" - if ( - not self.nlp_configuration - or not self.nlp_configuration.get("models") - or not self.nlp_configuration.get("nlp_engine_name") - ): - raise ValueError( - "Illegal nlp configuration. " - "Configuration should include nlp_engine_name and models " - "(list of model_name for each lang_code)." - ) + # Configuration is already validated by Pydantic in __init__ nlp_engine_name = self.nlp_configuration["nlp_engine_name"] if nlp_engine_name not in self.nlp_engines: raise ValueError( f"NLP engine '{nlp_engine_name}' is not available. " "Make sure you have all required packages installed" ) - try: - nlp_engine_class = self.nlp_engines[nlp_engine_name] - nlp_models = self.nlp_configuration["models"] - ner_model_configuration = self.nlp_configuration.get( - "ner_model_configuration" - ) - if ner_model_configuration: - ner_model_configuration = NerModelConfiguration.from_dict( - ner_model_configuration - ) + nlp_engine_class = self.nlp_engines[nlp_engine_name] + nlp_models = self.nlp_configuration["models"] - engine = nlp_engine_class( - models=nlp_models, ner_model_configuration=ner_model_configuration - ) - engine.load() - logger.info( - f"Created NLP engine: {engine.engine_name}. " - f"Loaded models: {list(engine.nlp.keys())}" + ner_model_configuration = self.nlp_configuration.get("ner_model_configuration") + if ner_model_configuration: + ner_model_configuration = NerModelConfiguration.from_dict( + ner_model_configuration ) - return engine - except KeyError: - raise ValueError("Wrong NLP engine configuration") - - @staticmethod - def _read_nlp_conf(conf_file: Union[Path, str]) -> dict: - """ - Read the nlp configuration from a provided yaml file. - :param conf_file: The conf file path to read - """ - - with open(conf_file) as file: - nlp_configuration = yaml.safe_load(file) - - if "ner_model_configuration" not in nlp_configuration: - logger.warning( - "configuration file is missing 'ner_model_configuration'. Using default" - ) - - return nlp_configuration - - @staticmethod - def _get_full_conf_path( - default_conf_file: Union[Path, str] = "default.yaml", - ) -> Path: - """Return a Path to the default conf file.""" - return Path(Path(__file__).parent.parent, "conf", default_conf_file) + engine = nlp_engine_class( + models=nlp_models, ner_model_configuration=ner_model_configuration + ) + engine.load() + logger.info( + f"Created NLP engine: {engine.engine_name}. " + f"Loaded models: {list(engine.nlp.keys())}" + ) + return engine diff --git a/presidio-analyzer/presidio_analyzer/nlp_engine/stanza_nlp_engine.py b/presidio-analyzer/presidio_analyzer/nlp_engine/stanza_nlp_engine.py index fc39e27d40..079587a8c2 100644 --- a/presidio-analyzer/presidio_analyzer/nlp_engine/stanza_nlp_engine.py +++ b/presidio-analyzer/presidio_analyzer/nlp_engine/stanza_nlp_engine.py @@ -298,7 +298,7 @@ def __call__(self, text): f"expansion or because the character offsets don't map to " f"valid tokens produced by the Stanza tokenizer:\n" f"Words: {words}\n" - f"Entities: {[(e.text, e.type, e.start_char, e.end_char) for e in snlp_doc.entities]}", # noqa + f"Entities: {[(e.text, e.type, e.start_char, e.end_char) for e in snlp_doc.entities]}", # noqa stacklevel=4, ) else: @@ -375,7 +375,7 @@ def __get_words_and_spaces(words, text): text_spaces.append(False) return text_words, text_spaces - def token_vector(self, token:Token): + def token_vector(self, token: Token): """Get Stanza's pretrained word embedding for given token. :param token: The token whose embedding will be returned diff --git a/presidio-analyzer/presidio_analyzer/pattern.py b/presidio-analyzer/presidio_analyzer/pattern.py index f37f8052d2..a4a3909bb4 100644 --- a/presidio-analyzer/presidio_analyzer/pattern.py +++ b/presidio-analyzer/presidio_analyzer/pattern.py @@ -1,6 +1,8 @@ import json from typing import Dict +import regex as re + class Pattern: """ @@ -18,6 +20,24 @@ def __init__(self, name: str, regex: str, score: float): self.compiled_regex = None self.compiled_with_flags = None + self.__validate_regex(self.regex) + self.__validate_score(self.score) + + @staticmethod + def __validate_regex(pattern: str) -> None: + """Validate that the regex pattern is valid.""" + try: + re.compile(pattern) + except re.error as e: + raise ValueError(f"Invalid regex pattern: {e}") + + @staticmethod + def __validate_score(score: float) -> None: + if score < 0 or score > 1: + raise ValueError( + f"Invalid score: {score}. " "Score should be between 0 and 1" + ) + def to_dict(self) -> Dict: """ Turn this instance into a dictionary. diff --git a/presidio-analyzer/presidio_analyzer/pattern_recognizer.py b/presidio-analyzer/presidio_analyzer/pattern_recognizer.py index 1ea918d7db..ad6053355b 100644 --- a/presidio-analyzer/presidio_analyzer/pattern_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/pattern_recognizer.py @@ -1,6 +1,6 @@ import datetime import logging -from typing import Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional import regex as re @@ -11,7 +11,9 @@ Pattern, RecognizerResult, ) -from presidio_analyzer.nlp_engine import NlpArtifacts + +if TYPE_CHECKING: + from presidio_analyzer.nlp_engine import NlpArtifacts logger = logging.getLogger("presidio-analyzer") @@ -79,7 +81,7 @@ def analyze( self, text: str, entities: List[str], - nlp_artifacts: Optional[NlpArtifacts] = None, + nlp_artifacts: Optional["NlpArtifacts"] = None, regex_flags: Optional[int] = None, ) -> List[RecognizerResult]: """ @@ -198,7 +200,7 @@ def __analyze_patterns( logger.debug( "--- match_time[%s]: %.6f seconds", pattern.name, - match_time.total_seconds() + match_time.total_seconds(), ) for match in matches: @@ -266,9 +268,30 @@ def to_dict(self) -> Dict: @classmethod def from_dict(cls, entity_recognizer_dict: Dict) -> "PatternRecognizer": """Create instance from a serialized dict.""" + # Make a copy to avoid mutating the input + entity_recognizer_dict = entity_recognizer_dict.copy() + patterns = entity_recognizer_dict.get("patterns") if patterns: patterns_list = [Pattern.from_dict(pat) for pat in patterns] entity_recognizer_dict["patterns"] = patterns_list + # Transform supported_entities (plural) to supported_entity (singular) + # PatternRecognizer only accepts supported_entity (singular) + if ( + "supported_entity" in entity_recognizer_dict + and "supported_entities" in entity_recognizer_dict + ): + raise ValueError( + "Both 'supported_entity' and 'supported_entities' " + "are present in the input dictionary. " + "Only one should be provided." + ) + if "supported_entities" in entity_recognizer_dict: + supported_entities = entity_recognizer_dict.pop("supported_entities") + if supported_entities and len(supported_entities) > 0: + # Only set if not already present + if "supported_entity" not in entity_recognizer_dict: + entity_recognizer_dict["supported_entity"] = supported_entities[0] + return cls(**entity_recognizer_dict) diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/__init__.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/__init__.py index 86797a44e8..c22f86cec4 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/__init__.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/__init__.py @@ -1,6 +1,5 @@ """Predefined recognizers package. Holds all the default recognizers.""" - # Australia recognizers from presidio_analyzer.predefined_recognizers.nlp_engine_recognizers.transformers_recognizer import ( # noqa: E501 TransformersRecognizer, diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/country_specific/india/in_gstin_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/country_specific/india/in_gstin_recognizer.py index 13a381551f..14662e609b 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/country_specific/india/in_gstin_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/country_specific/india/in_gstin_recognizer.py @@ -90,8 +90,8 @@ def _sanitize_value(self, text: str) -> str: # First, try to extract GSTIN pattern from the text gstin_pattern = ( - r'\b((?:0[1-9]|[1-3][0-7])[A-Za-z]{5}[0-9]{4}[A-Za-z]{1}' - r'[0-9A-Za-z]{1}Z[0-9A-Za-z]{1})\b' + r"\b((?:0[1-9]|[1-3][0-7])[A-Za-z]{5}[0-9]{4}[A-Za-z]{1}" + r"[0-9A-Za-z]{1}Z[0-9A-Za-z]{1})\b" ) match = re.search(gstin_pattern, text.upper()) if match: @@ -129,7 +129,7 @@ def _validate_gstin(self, gstin: str) -> bool: return False # Check 14th character should be 'Z' - if gstin[13] != 'Z': + if gstin[13] != "Z": return False # Check 15th character (checksum) diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/country_specific/thai/th_tnin_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/country_specific/thai/th_tnin_recognizer.py index e7b33f41ed..39a46eff75 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/country_specific/thai/th_tnin_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/country_specific/thai/th_tnin_recognizer.py @@ -61,7 +61,6 @@ class ThTninRecognizer(PatternRecognizer): "รหัสปชช", ] - def __init__( self, patterns: Optional[List[Pattern]] = None, @@ -105,7 +104,6 @@ def validate_result(self, pattern_text: str) -> Union[bool, None]: # Validate TNIN checksum (format validation is handled by regex) return self._validate_checksum(sanitized_value) - def _validate_checksum(self, tnin: str) -> bool: """ Validate the checksum of Thai TNIN. diff --git a/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizer_registry.py b/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizer_registry.py index 0acab3698e..8c00ad90be 100644 --- a/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizer_registry.py +++ b/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizer_registry.py @@ -56,7 +56,7 @@ def __init__( def _create_nlp_recognizer( self, nlp_engine: Optional[NlpEngine] = None, - supported_language: Optional[str] = None + supported_language: Optional[str] = None, ) -> SpacyRecognizer: nlp_recognizer = self.get_nlp_recognizer(nlp_engine) diff --git a/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizer_registry_provider.py b/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizer_registry_provider.py index 79c0f2c3bf..143f825133 100644 --- a/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizer_registry_provider.py +++ b/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizer_registry_provider.py @@ -6,6 +6,7 @@ from typing import Any, Dict, List, Optional, Union from presidio_analyzer import EntityRecognizer +from presidio_analyzer.input_validation import ConfigurationValidator from presidio_analyzer.nlp_engine import NlpEngine from presidio_analyzer.predefined_recognizers import SpacyRecognizer from presidio_analyzer.recognizer_registry import RecognizerRegistry @@ -55,6 +56,12 @@ def __init__( self.configuration = RecognizerConfigurationLoader.get( conf_file=conf_file, registry_configuration=registry_configuration ) + + self.configuration = ( + ConfigurationValidator.validate_recognizer_registry_configuration( + self.configuration + ) + ) self.nlp_engine = nlp_engine def create_recognizer_registry(self) -> RecognizerRegistry: diff --git a/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizers_loader_utils.py b/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizers_loader_utils.py index 358ea9a622..e73f64398f 100644 --- a/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizers_loader_utils.py +++ b/presidio-analyzer/presidio_analyzer/recognizer_registry/recognizers_loader_utils.py @@ -12,6 +12,11 @@ logger = logging.getLogger("presidio-analyzer") +class PredefinedRecognizerNotFoundError(Exception): + """Exception raised when a predefined recognizer is not found.""" + + pass + class RecognizerListLoader: """A utility class that initializes recognizers based on configuration.""" @@ -56,7 +61,8 @@ def _split_recognizers( predefined = [ recognizer_conf for recognizer_conf in recognizers_conf - if ("type" in recognizer_conf and recognizer_conf["type"] == "predefined") + if isinstance(recognizer_conf, dict) + and ("type" in recognizer_conf and recognizer_conf["type"] == "predefined") ] custom = [ recognizer_conf @@ -83,6 +89,7 @@ def _get_recognizer_languages( if ( isinstance(recognizer_conf, str) or "supported_languages" not in recognizer_conf + or recognizer_conf["supported_languages"] is None ): return [ { @@ -118,6 +125,13 @@ def get_recognizer_name(recognizer_conf: Union[Dict[str, Any], str]) -> str: return recognizer_conf return recognizer_conf["name"] + @staticmethod + def _convert_supported_entities_to_entity(conf: Dict[str, Any]) -> None: + if "supported_entities" in conf: + supported_entities = conf.pop("supported_entities") + if "supported_entity" not in conf and supported_entities: + conf["supported_entity"] = supported_entities[0] + @staticmethod def _is_language_supported_globally( recognizer: EntityRecognizer, @@ -141,9 +155,19 @@ def _create_custom_recognizers( supported_languages: Iterable[str], ) -> List[PatternRecognizer]: """Create a custom recognizer for each language, based on the provided conf.""" - # legacy recognizer - if "supported_language" in recognizer_conf: - return [PatternRecognizer.from_dict(recognizer_conf)] + # legacy recognizer (has supported_language set to a value, not None) + if recognizer_conf.get("supported_language"): + # Remove supported_languages field (plural) if present, + # as we're using supported_language (singular) + conf_copy = { + k: v for k, v in recognizer_conf.items() if k != "supported_languages" + } + + # Transform supported_entities -> supported_entity + # (PatternRecognizer expects singular) + RecognizerListLoader._convert_supported_entities_to_entity(conf_copy) + + return [PatternRecognizer.from_dict(conf_copy)] recognizers = [] @@ -155,6 +179,11 @@ def _create_custom_recognizers( for k, v in recognizer_conf.items() if k not in ["enabled", "type", "supported_languages"] } + + # Transform supported_entities -> supported_entity + # (PatternRecognizer expects singular) + RecognizerListLoader._convert_supported_entities_to_entity(copied_recognizer) + kwargs = {**copied_recognizer, **supported_language} recognizers.append(PatternRecognizer.from_dict(kwargs)) @@ -182,7 +211,7 @@ def get_all_existing_recognizers( ) @staticmethod - def _get_existing_recognizer_cls(recognizer_name: str) -> Type[EntityRecognizer]: + def get_existing_recognizer_cls(recognizer_name: str) -> Type[EntityRecognizer]: """ Get the recognizer class by name. @@ -197,11 +226,62 @@ def _get_existing_recognizer_cls(recognizer_name: str) -> Type[EntityRecognizer] if recognizer_name == recognizer.__name__: return recognizer - raise ValueError( + raise PredefinedRecognizerNotFoundError( f"Recognizer of name {recognizer_name} was not found in the " f"list of recognizers inheriting the EntityRecognizer class" ) + @staticmethod + def _is_pattern_recognizer(recognizer_cls: Type[EntityRecognizer]) -> bool: + """ + Check if a recognizer class inherits from PatternRecognizer. + + :param recognizer_cls: The recognizer class to check. + :return: True if the recognizer inherits from PatternRecognizer. + """ + try: + return issubclass(recognizer_cls, PatternRecognizer) + except TypeError: + return False + + @staticmethod + def _prepare_recognizer_kwargs( + recognizer_conf: Dict[str, Any], + language_conf: Dict[str, Any], + recognizer_cls: Type[EntityRecognizer], + ) -> Dict[str, Any]: + """ + Prepare kwargs for recognizer instantiation. + + Converts supported_entities to supported_entity + for PatternRecognizer subclasses. + Removes both fields if they are None to allow recognizer defaults to be used. + + :param recognizer_conf: The recognizer configuration. + :param language_conf: The language configuration. + :param recognizer_cls: The recognizer class. + :return: Prepared kwargs for recognizer instantiation. + """ + kwargs = {**recognizer_conf, **language_conf} + + # If this is a PatternRecognizer, handle supported_entities/supported_entity + if RecognizerListLoader._is_pattern_recognizer(recognizer_cls): + # Convert supported_entities (plural) to supported_entity + # (singular) if present + RecognizerListLoader._convert_supported_entities_to_entity(kwargs) + + # Remove supported_entity if it's None + # to allow the recognizer's default to be used + if kwargs.get("supported_entity") is None: + kwargs.pop("supported_entity", None) + else: + # For non-PatternRecognizer classes, remove both fields + # as they may not accept these parameters + kwargs.pop("supported_entities", None) + kwargs.pop("supported_entity", None) + + return kwargs + @staticmethod def get( recognizers: Dict[str, Any], @@ -215,32 +295,45 @@ def get( """ recognizer_instances = [] predefined, custom = RecognizerListLoader._split_recognizers(recognizers) + + predefined_to_exclude = {"enabled", "type", "supported_languages", "name"} + + # For custom recognizers, we keep 'supported_languages' + # and don't exclude 'supported_entity' + # because PatternRecognizer needs it + custom_to_exclude = {"enabled", "type"} for recognizer_conf in predefined: for language_conf in RecognizerListLoader._get_recognizer_languages( recognizer_conf=recognizer_conf, supported_languages=supported_languages ): if RecognizerListLoader.is_recognizer_enabled(recognizer_conf): - copied_recognizer_conf = { - k: v - for k, v in RecognizerListLoader._get_recognizer_items( - recognizer_conf=recognizer_conf - ) - if k not in ["enabled", "type", "supported_languages", "name"] - } - kwargs = {**copied_recognizer_conf, **language_conf} + new_conf = RecognizerListLoader._filter_recognizer_fields( + recognizer_conf, to_exclude=predefined_to_exclude + ) + recognizer_name = RecognizerListLoader.get_recognizer_name( recognizer_conf=recognizer_conf ) - recognizer_cls = RecognizerListLoader._get_existing_recognizer_cls( + recognizer_cls = RecognizerListLoader.get_existing_recognizer_cls( recognizer_name=recognizer_name ) + + # Prepare kwargs, converting supported_entities + # to supported_entity if needed + kwargs = RecognizerListLoader._prepare_recognizer_kwargs( + new_conf, language_conf, recognizer_cls + ) + recognizer_instances.append(recognizer_cls(**kwargs)) for recognizer_conf in custom: if RecognizerListLoader.is_recognizer_enabled(recognizer_conf): + new_conf = RecognizerListLoader._filter_recognizer_fields( + recognizer_conf, to_exclude=custom_to_exclude + ) recognizer_instances.extend( RecognizerListLoader._create_custom_recognizers( - recognizer_conf=recognizer_conf, + recognizer_conf=new_conf, supported_languages=supported_languages, ) ) @@ -259,6 +352,19 @@ def get( return recognizer_instances + @staticmethod + def _filter_recognizer_fields( + recognizer_conf: Dict[str, Any], to_exclude: Set[str] + ) -> Dict[str, Any]: + copied_recognizer_conf = { + k: v + for k, v in RecognizerListLoader._get_recognizer_items( + recognizer_conf=recognizer_conf + ) + if k not in to_exclude + } + return copied_recognizer_conf + class RecognizerConfigurationLoader: """A utility class that initializes recognizer registry configuration.""" @@ -280,7 +386,6 @@ def _merge_configuration( :param registry_configuration: The configuration to update. :param config_from_file: The configuration coming from the conf file. """ - registry_configuration.update( { k: v @@ -289,14 +394,7 @@ def _merge_configuration( } ) - missing_keys = [ - key - for key in RecognizerConfigurationLoader.mandatory_keys - if key not in registry_configuration - ] - if len(missing_keys) > 0: - raise ValueError(f"Missing the following keys: {', '.join(missing_keys)}") - + # Validation is now handled by Pydantic via ConfigurationValidator return registry_configuration @staticmethod @@ -318,14 +416,24 @@ def get( ) configuration = {} + config_from_file = {} + use_defaults = True if registry_configuration: configuration = registry_configuration.copy() + # Check if registry_configuration has all mandatory keys + # Note: supported_languages is now optional, + # so we only check for recognizers + mandatory_keys_set = {"recognizers", "global_regex_flags"} + config_keys = set(configuration.keys()) + if mandatory_keys_set.issubset(config_keys): + use_defaults = False if conf_file: try: with open(conf_file) as file: config_from_file = yaml.safe_load(file) + use_defaults = False except OSError: logger.warning( @@ -334,12 +442,16 @@ def get( ) with open(RecognizerConfigurationLoader._get_full_conf_path()) as file: config_from_file = yaml.safe_load(file) + use_defaults = False except Exception as e: raise ValueError( f"Failed to parse file {conf_file}." f"Error: {str(e)}" ) - else: + + # Load defaults if needed (no config provided, + # or registry_configuration is incomplete) + if use_defaults: with open(RecognizerConfigurationLoader._get_full_conf_path()) as file: config_from_file = yaml.safe_load(file) @@ -355,9 +467,27 @@ def get( f"got {type(registry_configuration)}" ) + # Check if config_from_file has any invalid keys + # (keys that aren't mandatory or valid optional keys) + # If it has keys but none of them are mandatory keys, + # it's likely an invalid config + if config_from_file and conf_file: + config_keys = set(config_from_file.keys()) + mandatory_keys_set = {"recognizers"} # Only recognizers is truly mandatory + + # If config has keys but none are mandatory and it's from a conf_file, + # it's probably invalid - don't merge with defaults + if config_keys and not config_keys.intersection(mandatory_keys_set): + raise ValueError( + f"Configuration file {conf_file} does not contain any of the " + f"mandatory keys: {list(mandatory_keys_set)}. " + f"Found keys: {list(config_keys)}" + ) + configuration = RecognizerConfigurationLoader._merge_configuration( registry_configuration=configuration, config_from_file=config_from_file ) + return configuration @staticmethod diff --git a/presidio-analyzer/presidio_analyzer/remote_recognizer.py b/presidio-analyzer/presidio_analyzer/remote_recognizer.py index c734b3c4a8..0350e2f5ae 100644 --- a/presidio-analyzer/presidio_analyzer/remote_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/remote_recognizer.py @@ -1,8 +1,10 @@ from abc import ABC, abstractmethod -from typing import List, Optional +from typing import TYPE_CHECKING, List, Optional from presidio_analyzer import EntityRecognizer -from presidio_analyzer.nlp_engine import NlpArtifacts + +if TYPE_CHECKING: + from presidio_analyzer.nlp_engine import NlpArtifacts class RemoteRecognizer(ABC, EntityRecognizer): @@ -35,7 +37,7 @@ def load(self): # noqa: D102 pass @abstractmethod - def analyze(self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts): + def analyze(self, text: str, entities: List[str], nlp_artifacts: "NlpArtifacts"): """ Call an external service for PII detection. diff --git a/presidio-analyzer/pyproject.toml b/presidio-analyzer/pyproject.toml index 99662aaad1..bf32f71134 100644 --- a/presidio-analyzer/pyproject.toml +++ b/presidio-analyzer/pyproject.toml @@ -27,7 +27,8 @@ dependencies = [ "regex", "tldextract", "pyyaml", - "phonenumbers (>=8.12,<10.0.0)" + "phonenumbers (>=8.12,<10.0.0)", + "pydantic (>=2.0.0,<3.0.0)" ] [project.optional-dependencies] diff --git a/presidio-analyzer/tests/conf/custom_recognizer_yaml.yaml b/presidio-analyzer/tests/conf/custom_recognizer_yaml.yaml index 36adf864a5..d75abe5359 100644 --- a/presidio-analyzer/tests/conf/custom_recognizer_yaml.yaml +++ b/presidio-analyzer/tests/conf/custom_recognizer_yaml.yaml @@ -13,6 +13,7 @@ recognizer_registry: supported_entity: "ZIP" - name: "SpacyRecognizer" enabled: false + type: predefined supported_languages: - en diff --git a/presidio-analyzer/tests/conf/missing_global_regex_flags.yaml b/presidio-analyzer/tests/conf/missing_global_regex_flags.yaml new file mode 100644 index 0000000000..3b7c618fbd --- /dev/null +++ b/presidio-analyzer/tests/conf/missing_global_regex_flags.yaml @@ -0,0 +1,8 @@ +# Test configuration file with missing global_regex_flags field +# This should raise a warning and use default value + +supported_languages: + - en +recognizers: + - CreditCardRecognizer + - EmailRecognizer diff --git a/presidio-analyzer/tests/conf/missing_recognizers.yaml b/presidio-analyzer/tests/conf/missing_recognizers.yaml new file mode 100644 index 0000000000..d2543c5896 --- /dev/null +++ b/presidio-analyzer/tests/conf/missing_recognizers.yaml @@ -0,0 +1,7 @@ +# Test configuration file with missing recognizers field +# This should raise an exception + +supported_languages: + - en + - es +global_regex_flags: 26 \ No newline at end of file diff --git a/presidio-analyzer/tests/conf/test_analyzer_engine.yaml b/presidio-analyzer/tests/conf/test_analyzer_engine.yaml index ec9d528984..9d68dea705 100644 --- a/presidio-analyzer/tests/conf/test_analyzer_engine.yaml +++ b/presidio-analyzer/tests/conf/test_analyzer_engine.yaml @@ -4,7 +4,7 @@ recognizer_registry: - name: CreditCardRecognizer supported_languages: - en - supported_entity: IT_FISCAL_CODE + supported_entity: CREDIT_CARD type: predefined - name: ItFiscalCodeRecognizer diff --git a/presidio-analyzer/tests/conf/test_analyzer_engine_missing_values.yaml b/presidio-analyzer/tests/conf/test_analyzer_engine_missing_values.yaml deleted file mode 100644 index 3abaf9b003..0000000000 --- a/presidio-analyzer/tests/conf/test_analyzer_engine_missing_values.yaml +++ /dev/null @@ -1,7 +0,0 @@ -recognizer_registry: - global_regex_flags: 26 - -supported_languages: - - de - - en - - es \ No newline at end of file diff --git a/presidio-analyzer/tests/conf/test_minimal_registry_conf.yaml b/presidio-analyzer/tests/conf/test_minimal_registry_conf.yaml index 18a2bd24fa..6cfd0c3898 100644 --- a/presidio-analyzer/tests/conf/test_minimal_registry_conf.yaml +++ b/presidio-analyzer/tests/conf/test_minimal_registry_conf.yaml @@ -1,4 +1,5 @@ global_regex_flags: 26 -recognizers: [] +recognizers: + - name: "CreditCardRecognizer" supported_languages: - en \ No newline at end of file diff --git a/presidio-analyzer/tests/test_analyzer_engine_provider.py b/presidio-analyzer/tests/test_analyzer_engine_provider.py index ba45e00a31..3060ecd549 100644 --- a/presidio-analyzer/tests/test_analyzer_engine_provider.py +++ b/presidio-analyzer/tests/test_analyzer_engine_provider.py @@ -93,27 +93,6 @@ def test_analyzer_engine_provider_configuration_file(): assert engine.nlp_engine.engine_name == "spacy" -def test_analyzer_engine_provider_configuration_file_missing_values_expect_defaults( - mandatory_recognizers, -): - test_yaml, _, _ = get_full_paths("conf/test_analyzer_engine_missing_values.yaml") - provider = AnalyzerEngineProvider(test_yaml) - engine = provider.create_engine() - assert engine.supported_languages == ["de", "en", "es"] - assert engine.default_score_threshold == 0 - recognizer_registry = engine.registry - assert ( - recognizer_registry.global_regex_flags - == re.DOTALL | re.MULTILINE | re.IGNORECASE - ) - assert recognizer_registry.supported_languages == ["de", "en", "es"] - names = [recognizer.name for recognizer in recognizer_registry.recognizers] - for predefined_recognizer in mandatory_recognizers: - assert predefined_recognizer in names - assert isinstance(engine.nlp_engine, SpacyNlpEngine) - assert engine.nlp_engine.engine_name == "spacy" - - def test_analyzer_engine_provider_defaults(mandatory_recognizers): provider = AnalyzerEngineProvider() engine = provider.create_engine() @@ -351,3 +330,234 @@ def test_analyzer_engine_provider_one_custom_recognizer(): assert len(analyzer_engine.get_recognizers()) == 1 assert analyzer_engine.analyze("My zip code is 12345", language="en")[0].score == pytest.approx(0.4) + +def test_analyzer_engine_provider_invalid_analyzer_conf_file(): + """Test that invalid analyzer configuration file path raises error.""" + with pytest.raises(ValueError): + AnalyzerEngineProvider(analyzer_engine_conf_file="/nonexistent/path/file.yaml") + + +def test_analyzer_engine_provider_invalid_nlp_conf_file(): + """Test that invalid NLP engine configuration file path raises error.""" + with pytest.raises(ValueError): + AnalyzerEngineProvider(nlp_engine_conf_file="/nonexistent/path/file.yaml") + + +def test_analyzer_engine_provider_invalid_registry_conf_file(): + """Test that invalid recognizer registry configuration file path raises error.""" + with pytest.raises(ValueError): + AnalyzerEngineProvider(recognizer_registry_conf_file="/nonexistent/path/file.yaml") + + +def test_analyzer_engine_provider_get_configuration_with_nonexistent_file(): + """Test get_configuration falls back to default when file doesn't exist.""" + provider = AnalyzerEngineProvider() + + # Test with nonexistent file - should fall back to default + config = provider.get_configuration("/tmp/nonexistent_config_file_12345.yaml") + + # Should return a valid configuration (the default one) + assert config is not None + assert isinstance(config, dict) + + +def test_analyzer_engine_provider_get_configuration_with_invalid_yaml(): + """Test get_configuration handles invalid YAML gracefully.""" + import tempfile + import os + + # Create a temporary file with invalid YAML + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: + f.write("invalid: yaml: content: [[[") + temp_file = f.name + + try: + provider = AnalyzerEngineProvider() + config = provider.get_configuration(temp_file) + + # Should fall back to default configuration + assert config is not None + assert isinstance(config, dict) + finally: + os.unlink(temp_file) + + +def test_analyzer_engine_provider_get_full_conf_path(): + """Test _get_full_conf_path static method.""" + from pathlib import Path + + path = AnalyzerEngineProvider._get_full_conf_path() + + assert isinstance(path, Path) + assert path.name == "default_analyzer.yaml" + assert path.exists() + + +def test_analyzer_engine_provider_get_full_conf_path_custom_file(): + """Test _get_full_conf_path with custom filename.""" + from pathlib import Path + + path = AnalyzerEngineProvider._get_full_conf_path("custom_file.yaml") + + assert isinstance(path, Path) + assert path.name == "custom_file.yaml" + + +def test_analyzer_engine_provider_configuration_property(): + """Test that configuration property is set correctly.""" + provider = AnalyzerEngineProvider() + + assert provider.configuration is not None + assert isinstance(provider.configuration, dict) + + +def test_analyzer_engine_provider_nlp_engine_conf_file_property(): + """Test that nlp_engine_conf_file property is stored correctly.""" + test_yaml, nlp_yaml, _ = get_full_paths( + "conf/simple_analyzer_engine.yaml", + "conf/default.yaml", + ) + + provider = AnalyzerEngineProvider( + analyzer_engine_conf_file=test_yaml, + nlp_engine_conf_file=nlp_yaml, + ) + + assert provider.nlp_engine_conf_file == nlp_yaml + + +def test_analyzer_engine_provider_recognizer_registry_conf_file_property(): + """Test that recognizer_registry_conf_file property is stored correctly.""" + test_yaml, _, registry_yaml = get_full_paths( + "conf/simple_analyzer_engine.yaml", + None, + "conf/test_recognizer_registry.yaml", + ) + + provider = AnalyzerEngineProvider( + analyzer_engine_conf_file=test_yaml, + recognizer_registry_conf_file=registry_yaml, + ) + + assert provider.recognizer_registry_conf_file == registry_yaml + + +def test_analyzer_engine_provider_load_nlp_engine_from_conf(): + """Test _load_nlp_engine with nlp_configuration in analyzer config.""" + analyzer_yaml, _, _ = get_full_paths("conf/test_analyzer_engine.yaml") + + provider = AnalyzerEngineProvider(analyzer_engine_conf_file=analyzer_yaml) + nlp_engine = provider._load_nlp_engine() + + assert nlp_engine is not None + assert nlp_engine.engine_name == "spacy" + + +def test_analyzer_engine_provider_load_nlp_engine_default(): + """Test _load_nlp_engine falls back to default when no config provided.""" + provider = AnalyzerEngineProvider() + nlp_engine = provider._load_nlp_engine() + + assert nlp_engine is not None + assert isinstance(nlp_engine, SpacyNlpEngine) + + +def test_analyzer_engine_provider_load_recognizer_registry_from_embedded_config(): + """Test _load_recognizer_registry with embedded recognizer_registry in config.""" + analyzer_yaml, _, _ = get_full_paths("conf/test_analyzer_engine.yaml") + + provider = AnalyzerEngineProvider(analyzer_engine_conf_file=analyzer_yaml) + nlp_engine = provider._load_nlp_engine() + + registry = provider._load_recognizer_registry( + supported_languages=["en"], + nlp_engine=nlp_engine, + ) + + assert registry is not None + assert len(registry.recognizers) > 0 + + +def test_analyzer_engine_provider_load_recognizer_registry_default(): + """Test _load_recognizer_registry uses default when no config provided.""" + provider = AnalyzerEngineProvider() + nlp_engine = provider._load_nlp_engine() + + registry = provider._load_recognizer_registry( + supported_languages=["en"], + nlp_engine=nlp_engine, + ) + + assert registry is not None + assert len(registry.recognizers) > 0 + + +def test_analyzer_engine_provider_create_engine_with_all_params(): + """Test create_engine with all configuration parameters.""" + analyzer_yaml, nlp_yaml, registry_yaml = get_full_paths( + "conf/simple_analyzer_engine.yaml", + "conf/default.yaml", + "conf/test_recognizer_registry.yaml", + ) + + provider = AnalyzerEngineProvider( + analyzer_engine_conf_file=analyzer_yaml, + nlp_engine_conf_file=nlp_yaml, + recognizer_registry_conf_file=registry_yaml, + ) + + engine = provider.create_engine() + + assert engine is not None + assert engine.nlp_engine is not None + assert engine.registry is not None + assert len(engine.supported_languages) > 0 + + +def test_analyzer_engine_provider_multiple_languages_support(): + """Test analyzer engine with multiple language support.""" + analyzer_yaml, _, _ = get_full_paths("conf/test_analyzer_engine.yaml") + + provider = AnalyzerEngineProvider(analyzer_engine_conf_file=analyzer_yaml) + engine = provider.create_engine() + + assert "en" in engine.supported_languages + assert "de" in engine.supported_languages + assert "es" in engine.supported_languages + + +def test_analyzer_engine_provider_default_score_threshold(): + """Test that default_score_threshold is properly set.""" + analyzer_yaml, _, _ = get_full_paths("conf/test_analyzer_engine.yaml") + + provider = AnalyzerEngineProvider(analyzer_engine_conf_file=analyzer_yaml) + engine = provider.create_engine() + + assert engine.default_score_threshold == 0.7 + + +def test_analyzer_engine_provider_with_pathlib_path(): + """Test AnalyzerEngineProvider works with pathlib.Path objects.""" + from pathlib import Path + + analyzer_yaml, _, _ = get_full_paths("conf/simple_analyzer_engine.yaml") + analyzer_path = Path(analyzer_yaml) + + provider = AnalyzerEngineProvider(analyzer_engine_conf_file=analyzer_path) + engine = provider.create_engine() + + assert engine is not None + + +def test_analyzer_engine_provider_configuration_logging(caplog): + """Test that configuration loading logs appropriate messages.""" + import logging + + with caplog.at_level(logging.INFO): + provider = AnalyzerEngineProvider() + _ = provider.create_engine() + + # Check that some logging occurred + assert len(caplog.records) > 0 + + diff --git a/presidio-analyzer/tests/test_analyzer_request.py b/presidio-analyzer/tests/test_analyzer_request.py new file mode 100644 index 0000000000..c704ac9d57 --- /dev/null +++ b/presidio-analyzer/tests/test_analyzer_request.py @@ -0,0 +1,279 @@ +import regex as re +from presidio_analyzer import AnalyzerRequest, PatternRecognizer + + +class TestAnalyzerRequest: + """Tests for AnalyzerRequest class.""" + + def test_analyzer_request_basic_fields(self): + """Test basic field initialization.""" + req_data = { + "text": "My phone number is 555-1234", + "language": "en", + "entities": ["PHONE_NUMBER"], + "correlation_id": "test-123", + "score_threshold": 0.5, + "return_decision_process": True, + } + + request = AnalyzerRequest(req_data) + + assert request.text == "My phone number is 555-1234" + assert request.language == "en" + assert request.entities == ["PHONE_NUMBER"] + assert request.correlation_id == "test-123" + assert request.score_threshold == 0.5 + assert request.return_decision_process is True + + def test_analyzer_request_with_context(self): + """Test context field initialization (line 37).""" + req_data = { + "text": "Test text", + "language": "en", + "context": ["previous message", "current message"] + } + + request = AnalyzerRequest(req_data) + + assert request.context == ["previous message", "current message"] + + def test_analyzer_request_with_allow_list(self): + """Test allow_list field initialization (line 38).""" + req_data = { + "text": "Test text", + "language": "en", + "allow_list": ["John", "Microsoft", "Seattle"] + } + + request = AnalyzerRequest(req_data) + + assert request.allow_list == ["John", "Microsoft", "Seattle"] + + def test_analyzer_request_with_allow_list_match_default(self): + """Test allow_list_match field with default value (line 39).""" + req_data = { + "text": "Test text", + "language": "en", + } + + request = AnalyzerRequest(req_data) + + # Should default to "exact" + assert request.allow_list_match == "exact" + + def test_analyzer_request_with_allow_list_match_custom(self): + """Test allow_list_match field with custom value (line 39).""" + req_data = { + "text": "Test text", + "language": "en", + "allow_list_match": "partial" + } + + request = AnalyzerRequest(req_data) + + assert request.allow_list_match == "partial" + + def test_analyzer_request_with_regex_flags_default(self): + """Test regex_flags field with default value (line 40).""" + req_data = { + "text": "Test text", + "language": "en", + } + + request = AnalyzerRequest(req_data) + + # Should default to DOTALL | MULTILINE | IGNORECASE + expected_flags = re.DOTALL | re.MULTILINE | re.IGNORECASE + assert request.regex_flags == expected_flags + + def test_analyzer_request_with_regex_flags_custom(self): + """Test regex_flags field with custom value (line 40).""" + custom_flags = re.IGNORECASE | re.UNICODE + req_data = { + "text": "Test text", + "language": "en", + "regex_flags": custom_flags + } + + request = AnalyzerRequest(req_data) + + assert request.regex_flags == custom_flags + + def test_analyzer_request_without_context(self): + """Test that context is None when not provided.""" + req_data = { + "text": "Test text", + "language": "en", + } + + request = AnalyzerRequest(req_data) + + assert request.context is None + + def test_analyzer_request_without_allow_list(self): + """Test that allow_list is None when not provided.""" + req_data = { + "text": "Test text", + "language": "en", + } + + request = AnalyzerRequest(req_data) + + assert request.allow_list is None + + def test_analyzer_request_all_fields(self): + """Test initialization with all fields including lines 37-40.""" + req_data = { + "text": "My name is John and my email is john@example.com", + "language": "en", + "entities": ["PERSON", "EMAIL_ADDRESS"], + "correlation_id": "full-test-456", + "score_threshold": 0.7, + "return_decision_process": False, + "ad_hoc_recognizers": [ + { + "supported_entity": "CUSTOM_ENTITY", + "supported_language": "en", + "patterns": [ + { + "name": "custom_pattern", + "regex": r"\d{3}-\d{3}", + "score": 0.5 + } + ] + } + ], + "context": ["user profile", "chat history"], + "allow_list": ["John", "Microsoft"], + "allow_list_match": "fuzzy", + "regex_flags": re.IGNORECASE + } + + request = AnalyzerRequest(req_data) + + assert request.text == "My name is John and my email is john@example.com" + assert request.language == "en" + assert request.entities == ["PERSON", "EMAIL_ADDRESS"] + assert request.correlation_id == "full-test-456" + assert request.score_threshold == 0.7 + assert request.return_decision_process is False + assert len(request.ad_hoc_recognizers) == 1 + assert isinstance(request.ad_hoc_recognizers[0], PatternRecognizer) + assert request.context == ["user profile", "chat history"] + assert request.allow_list == ["John", "Microsoft"] + assert request.allow_list_match == "fuzzy" + assert request.regex_flags == re.IGNORECASE + + def test_analyzer_request_with_ad_hoc_recognizers(self): + """Test ad_hoc_recognizers field initialization.""" + req_data = { + "text": "Test text", + "language": "en", + "ad_hoc_recognizers": [ + { + "supported_entity": "CUSTOM_ID", + "supported_language": "en", + "patterns": [ + { + "name": "id_pattern", + "regex": r"ID-\d{5}", + "score": 0.8 + } + ] + } + ] + } + + request = AnalyzerRequest(req_data) + + assert len(request.ad_hoc_recognizers) == 1 + assert isinstance(request.ad_hoc_recognizers[0], PatternRecognizer) + assert request.ad_hoc_recognizers[0].supported_entities == ["CUSTOM_ID"] + + def test_analyzer_request_without_ad_hoc_recognizers(self): + """Test that ad_hoc_recognizers is empty list when not provided.""" + req_data = { + "text": "Test text", + "language": "en", + } + + request = AnalyzerRequest(req_data) + + assert request.ad_hoc_recognizers == [] + + def test_analyzer_request_empty_dict(self): + """Test initialization with empty dictionary.""" + req_data = {} + + request = AnalyzerRequest(req_data) + + assert request.text is None + assert request.language is None + assert request.entities is None + assert request.correlation_id is None + assert request.score_threshold is None + assert request.return_decision_process is None + assert request.ad_hoc_recognizers == [] + assert request.context is None + assert request.allow_list is None + assert request.allow_list_match == "exact" + assert request.regex_flags == (re.DOTALL | re.MULTILINE | re.IGNORECASE) + + def test_analyzer_request_with_complex_context(self): + """Test context field with various data types.""" + req_data = { + "text": "Test text", + "language": "en", + "context": { + "user_id": "12345", + "session": "abc", + "metadata": {"key": "value"} + } + } + + request = AnalyzerRequest(req_data) + + assert request.context == { + "user_id": "12345", + "session": "abc", + "metadata": {"key": "value"} + } + + def test_analyzer_request_with_multiple_regex_flags(self): + """Test regex_flags with multiple combined flags.""" + custom_flags = re.IGNORECASE | re.MULTILINE | re.DOTALL | re.VERBOSE + req_data = { + "text": "Test text", + "language": "en", + "regex_flags": custom_flags + } + + request = AnalyzerRequest(req_data) + + assert request.regex_flags == custom_flags + # Verify individual flags are present + assert request.regex_flags & re.IGNORECASE + assert request.regex_flags & re.MULTILINE + assert request.regex_flags & re.DOTALL + assert request.regex_flags & re.VERBOSE + + def test_analyzer_request_allow_list_match_variations(self): + """Test various allow_list_match values.""" + test_cases = [ + "exact", + "partial", + "fuzzy", + "regex", + "custom_match_type" + ] + + for match_type in test_cases: + req_data = { + "text": "Test text", + "language": "en", + "allow_list_match": match_type + } + + request = AnalyzerRequest(req_data) + assert request.allow_list_match == match_type + diff --git a/presidio-analyzer/tests/test_configuration_validator.py b/presidio-analyzer/tests/test_configuration_validator.py new file mode 100644 index 0000000000..ad5ed5687a --- /dev/null +++ b/presidio-analyzer/tests/test_configuration_validator.py @@ -0,0 +1,469 @@ +"""Tests for the Pydantic-based validation system using existing adapted classes.""" +import pytest + +from presidio_analyzer.input_validation import ConfigurationValidator + + +# ========== Language Code Validation Tests ========== + +def test_validate_language_codes_valid(): + """Test valid language codes.""" + valid_languages = ["en", "es", "fr", "de"] + result = ConfigurationValidator.validate_language_codes(valid_languages) + assert result == valid_languages + + +def test_validate_language_codes_valid_with_country(): + """Test valid language codes with country codes.""" + valid_languages = ["en-US", "en-GB", "es-ES"] + result = ConfigurationValidator.validate_language_codes(valid_languages) + assert result == valid_languages + + +def test_validate_language_codes_invalid_format(): + """Test invalid language code format.""" + invalid_languages = ["english", "EN", "e", "en-us"] + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_language_codes(invalid_languages) + assert "Invalid language code format" in str(exc_info.value) + + +def test_validate_language_codes_mixed_invalid(): + """Test mixed valid and invalid language codes.""" + mixed_languages = ["en", "invalid_lang", "es"] + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_language_codes(mixed_languages) + assert "Invalid language code format" in str(exc_info.value) + + +# ========== File Path Validation Tests ========== + +def test_file_path_validation_success(tmp_path): + """Test file path validation with existing file.""" + test_file = tmp_path / "test.yaml" + test_file.write_text("test: content") + + validated_path = ConfigurationValidator.validate_file_path(str(test_file)) + assert validated_path == test_file + + +def test_file_path_validation_with_path_object(tmp_path): + """Test file path validation with Path object.""" + test_file = tmp_path / "test.yaml" + test_file.write_text("test: content") + + validated_path = ConfigurationValidator.validate_file_path(test_file) + assert validated_path == test_file + + +def test_file_path_validation_nonexistent(): + """Test file path validation with non-existent file.""" + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_file_path("/nonexistent/file.yaml") + + assert "does not exist" in str(exc_info.value) + + +def test_file_path_validation_directory(tmp_path): + """Test file path validation with directory instead of file.""" + test_dir = tmp_path / "test_directory" + test_dir.mkdir() + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_file_path(test_dir) + + assert "not a file" in str(exc_info.value) + + +# ========== Score Threshold Validation Tests ========== + +def test_validate_score_threshold_valid(): + """Test valid score thresholds.""" + valid_thresholds = [0.0, 0.5, 1.0, 0.25, 0.75] + for threshold in valid_thresholds: + result = ConfigurationValidator.validate_score_threshold(threshold) + assert result == threshold + + +def test_validate_score_threshold_above_one(): + """Test score threshold above 1.0.""" + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_score_threshold(1.5) + assert "must be between 0.0 and 1.0" in str(exc_info.value) + + +def test_validate_score_threshold_negative(): + """Test negative score threshold.""" + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_score_threshold(-0.1) + assert "must be between 0.0 and 1.0" in str(exc_info.value) + + +def test_validate_score_threshold_way_above(): + """Test score threshold far above valid range.""" + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_score_threshold(100.0) + assert "must be between 0.0 and 1.0" in str(exc_info.value) + + +# ========== NLP Configuration Validation Tests ========== + +def test_configuration_validator_nlp_config_valid(): + """Test ConfigurationValidator accepts valid NLP validation.""" + valid_config = { + "nlp_engine_name": "spacy", + "models": [ + {"lang_code": "en", "model_name": "en_core_web_lg"} + ] + } + + validated = ConfigurationValidator.validate_nlp_configuration(valid_config) + assert validated == valid_config + + +def test_nlp_config_multiple_models(): + """Test NLP configuration with multiple models.""" + valid_config = { + "nlp_engine_name": "spacy", + "models": [ + {"lang_code": "en", "model_name": "en_core_web_lg"}, + {"lang_code": "es", "model_name": "es_core_news_lg"} + ] + } + + validated = ConfigurationValidator.validate_nlp_configuration(valid_config) + assert validated == valid_config + + +def test_configuration_validator_nlp_config_missing_fields(): + """Test ConfigurationValidator rejects NLP config with missing required fields.""" + invalid_config = { + "nlp_engine_name": "spacy" + # Missing "models" field + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_nlp_configuration(invalid_config) + + assert "missing required fields" in str(exc_info.value) + + +def test_nlp_config_missing_nlp_engine_name(): + """Test NLP config missing nlp_engine_name.""" + invalid_config = { + "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}] + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_nlp_configuration(invalid_config) + assert "missing required fields" in str(exc_info.value) + + +def test_nlp_config_not_dict(): + """Test NLP configuration that is not a dictionary.""" + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_nlp_configuration("not a dict") + assert "must be a dictionary" in str(exc_info.value) + + +def test_nlp_config_models_not_list(): + """Test NLP configuration with models not as list.""" + invalid_config = { + "nlp_engine_name": "spacy", + "models": {"lang_code": "en", "model_name": "en_core_web_lg"} + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_nlp_configuration(invalid_config) + assert "Models must be a non-empty list" in str(exc_info.value) + + +def test_nlp_config_models_empty_list(): + """Test NLP configuration with empty models list.""" + invalid_config = { + "nlp_engine_name": "spacy", + "models": [] + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_nlp_configuration(invalid_config) + assert "Models must be a non-empty list" in str(exc_info.value) + + +def test_nlp_config_model_not_dict(): + """Test NLP configuration with model that is not a dict.""" + invalid_config = { + "nlp_engine_name": "spacy", + "models": ["en_core_web_lg"] + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_nlp_configuration(invalid_config) + assert "Each model must be a dictionary" in str(exc_info.value) + + +def test_nlp_config_model_missing_lang_code(): + """Test NLP configuration with model missing lang_code.""" + invalid_config = { + "nlp_engine_name": "spacy", + "models": [{"model_name": "en_core_web_lg"}] + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_nlp_configuration(invalid_config) + assert "must have 'lang_code' and 'model_name'" in str(exc_info.value) + + +def test_nlp_config_model_missing_model_name(): + """Test NLP configuration with model missing model_name.""" + invalid_config = { + "nlp_engine_name": "spacy", + "models": [{"lang_code": "en"}] + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_nlp_configuration(invalid_config) + assert "must have 'lang_code' and 'model_name'" in str(exc_info.value) + + +# ========== Recognizer Registry Configuration Tests ========== + +def test_recognizer_registry_valid_custom_recognizer(): + """Test valid recognizer registry configuration with custom recognizer.""" + valid_config = { + "supported_languages": ["en"], + "recognizers": [ + { + "name": "CustomRecognizer", + "type": "custom", + "supported_entity": "CUSTOM_ENTITY", + "patterns": [ + { + "name": "pattern1", + "regex": "test", + "score": 0.5 + } + ] + } + ] + } + + result = ConfigurationValidator.validate_recognizer_registry_configuration(valid_config) + assert result is not None + assert "recognizers" in result + + +def test_recognizer_registry_valid_predefined_recognizer(): + """Test valid recognizer registry configuration with predefined recognizer.""" + valid_config = { + "supported_languages": ["en"], + "recognizers": [ + { + "name": "CreditCardRecognizer", + "type": "predefined" + } + ] + } + + result = ConfigurationValidator.validate_recognizer_registry_configuration(valid_config) + assert result is not None + + +def test_recognizer_registry_empty_recognizers_list(): + """Test recognizer registry with empty recognizers list.""" + invalid_config = { + "supported_languages": ["en"], + "recognizers": [] + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_recognizer_registry_configuration(invalid_config) + assert "Invalid recognizer registry configuration" in str(exc_info.value) + + +def test_configuration_validator_recognizer_registry_unknown_keys(): + """Test ConfigurationValidator rejects recognizer registry config with unknown keys.""" + invalid_config = { + "supported_languages": ["en"], + "global_regex_flags": 26, + "recognizers": [ + { + "name": "CreditCardRecognizer", + "type": "predefined" + } + ], + "invalid_field": "value", + "typo_key": 456 + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_recognizer_registry_configuration(invalid_config) + assert "Invalid recognizer registry configuration" in str(exc_info.value) + + +# ========== Analyzer Configuration Tests ========== + +def test_configuration_validator_analyzer_config_valid(): + """Test ConfigurationValidator accepts valid analyzer validation.""" + valid_config = { + "supported_languages": ["en", "es"], + "default_score_threshold": 0.5, + "nlp_configuration": { + "nlp_engine_name": "spacy", + "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}] + } + } + + validated = ConfigurationValidator.validate_analyzer_configuration(valid_config) + assert validated == valid_config + + +def test_analyzer_config_minimal(): + """Test minimal valid analyzer configuration.""" + valid_config = { + "supported_languages": ["en"] + } + + validated = ConfigurationValidator.validate_analyzer_configuration(valid_config) + assert validated == valid_config + + +def test_analyzer_config_with_recognizer_registry(): + """Test analyzer configuration with recognizer registry.""" + valid_config = { + "supported_languages": ["en"], + "recognizer_registry": { + "supported_languages": ["en"], + "recognizers": [ + { + "name": "CreditCardRecognizer", + "type": "predefined" + } + ] + } + } + + validated = ConfigurationValidator.validate_analyzer_configuration(valid_config) + assert validated is not None + + +def test_configuration_validator_analyzer_config_invalid_threshold(): + """Test ConfigurationValidator rejects invalid score threshold.""" + invalid_config = { + "supported_languages": ["en"], + "default_score_threshold": 1.5 # Invalid: > 1.0 + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_analyzer_configuration(invalid_config) + + assert "must be between 0.0 and 1.0" in str(exc_info.value) + + +def test_analyzer_config_not_dict(): + """Test analyzer configuration that is not a dictionary.""" + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_analyzer_configuration("not a dict") + assert "must be a dictionary" in str(exc_info.value) + + +def test_configuration_validator_analyzer_config_unknown_keys(): + """Test ConfigurationValidator rejects analyzer config with unknown keys.""" + invalid_config = { + "supported_languages": ["en"], + "default_score_threshold": 0.5, + "unknown_key": "some_value", + "another_typo": 123 + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_analyzer_configuration(invalid_config) + assert "Unknown configuration key" in str(exc_info.value) + + +def test_analyzer_config_invalid_languages(): + """Test analyzer configuration with invalid language codes.""" + invalid_config = { + "supported_languages": ["invalid_lang"] + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_analyzer_configuration(invalid_config) + assert "Invalid language code format" in str(exc_info.value) + + +def test_analyzer_config_invalid_nlp_nested(): + """Test analyzer configuration with invalid nested NLP config.""" + invalid_config = { + "supported_languages": ["en"], + "nlp_configuration": { + "nlp_engine_name": "spacy" + # Missing models + } + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_analyzer_configuration(invalid_config) + assert "missing required fields" in str(exc_info.value) + + +def test_analyzer_config_invalid_recognizer_registry_nested(): + """Test analyzer configuration with invalid nested recognizer registry.""" + invalid_config = { + "supported_languages": ["en"], + "recognizer_registry": { + "recognizers": [] # Empty list not allowed + } + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_analyzer_configuration(invalid_config) + assert "Invalid recognizer registry configuration" in str(exc_info.value) + + +def test_analyzer_config_threshold_at_boundaries(): + """Test analyzer configuration with threshold at boundaries.""" + # Test 0.0 + config_zero = { + "supported_languages": ["en"], + "default_score_threshold": 0.0 + } + validated = ConfigurationValidator.validate_analyzer_configuration(config_zero) + assert validated["default_score_threshold"] == 0.0 + + # Test 1.0 + config_one = { + "supported_languages": ["en"], + "default_score_threshold": 1.0 + } + validated = ConfigurationValidator.validate_analyzer_configuration(config_one) + assert validated["default_score_threshold"] == 1.0 + + +def test_analyzer_config_all_fields(): + """Test analyzer configuration with all fields populated.""" + valid_config = { + "supported_languages": ["en", "es"], + "default_score_threshold": 0.7, + "nlp_configuration": { + "nlp_engine_name": "spacy", + "models": [ + {"lang_code": "en", "model_name": "en_core_web_lg"}, + {"lang_code": "es", "model_name": "es_core_news_lg"} + ] + }, + "recognizer_registry": { + "supported_languages": ["en"], + "global_regex_flags": 26, + "recognizers": [ + { + "name": "CreditCardRecognizer", + "type": "predefined" + } + ] + } + } + + validated = ConfigurationValidator.validate_analyzer_configuration(valid_config) + assert validated == valid_config + diff --git a/presidio-analyzer/tests/test_language_validation.py b/presidio-analyzer/tests/test_language_validation.py new file mode 100644 index 0000000000..7cc22909ff --- /dev/null +++ b/presidio-analyzer/tests/test_language_validation.py @@ -0,0 +1,18 @@ +import pytest + +from presidio_analyzer.input_validation import validate_language_codes + + +def test_configuration_validator_language_codes_no_exception(): + """Test ConfigurationValidator accepts valid language codes.""" + valid_languages = ["en", "es", "fr", "en-US", "es-ES"] + validate_language_codes(valid_languages) + +def test_configuration_validator_language_codes_invalid(): + """Test ConfigurationValidator rejects invalid language codes.""" + invalid_languages = ["invalid_lang"] + + with pytest.raises(ValueError) as exc_info: + validate_language_codes(invalid_languages) + + assert "Invalid language code format" in str(exc_info.value) diff --git a/presidio-analyzer/tests/test_ner_model_configuration.py b/presidio-analyzer/tests/test_ner_model_configuration.py index 09c1e95cc5..56d12070e3 100644 --- a/presidio-analyzer/tests/test_ner_model_configuration.py +++ b/presidio-analyzer/tests/test_ner_model_configuration.py @@ -2,6 +2,7 @@ import pytest import yaml +from pydantic import ValidationError from presidio_analyzer.nlp_engine import NerModelConfiguration @@ -43,9 +44,7 @@ def test_from_dict_happy_path( [ ("stride", []), ("stride", "X"), - ("stride", None), ("alignment_mode", 5), - ("alignment_mode", None), ("low_confidence_score_multiplier", "X"), ], ) @@ -55,3 +54,54 @@ def test_from_dict_wrong_types(ner_model_configuration_dict, key, value): with pytest.raises(ValueError): NerModelConfiguration.from_dict(new_config) + +@pytest.mark.parametrize( + "key, value", + [ + ("stride", None), + ("alignment_mode", None), + ], +) +def test_from_dict_none_resolves_to_default(ner_model_configuration_dict, key, value): + new_config = ner_model_configuration_dict.copy() + new_config[key] = value + ner_model_configuration = NerModelConfiguration.from_dict(new_config) + assert ner_model_configuration.stride is not None + assert ner_model_configuration.alignment_mode is not None + + +def test_ner_model_configuration_validation_success(): + """Test NerModelConfiguration validates correctly.""" + config_data = { + "aggregation_strategy": "max", + "stride": 16, + "alignment_mode": "expand", + "default_score": 0.9, + "low_confidence_score_multiplier": 0.3 + } + + config = NerModelConfiguration.from_dict(config_data) + assert config.aggregation_strategy == "max" + assert config.stride == 16 + assert config.default_score == 0.9 + assert config.low_confidence_score_multiplier == 0.3 + +def test_ner_model_configuration_invalid_score(): + """Test NerModelConfiguration rejects invalid score values.""" + config_data = { + "default_score": 1.5 # Invalid: > 1.0 + } + + with pytest.raises(ValidationError) as exc_info: + NerModelConfiguration.from_dict(config_data) + + assert "less than or equal to 1" in str(exc_info.value) + +def test_backward_compatibility_ner_config_to_dict(): + """Test that NerModelConfiguration maintains backward compatibility.""" + config = NerModelConfiguration(default_score=0.8, stride=20) + config_dict = config.to_dict() + + assert "default_score" in config_dict + assert config_dict["default_score"] == 0.8 + assert config_dict["stride"] == 20 diff --git a/presidio-analyzer/tests/test_nlp_engine_provider.py b/presidio-analyzer/tests/test_nlp_engine_provider.py index b65f315220..f10517c3ed 100644 --- a/presidio-analyzer/tests/test_nlp_engine_provider.py +++ b/presidio-analyzer/tests/test_nlp_engine_provider.py @@ -251,17 +251,22 @@ def test_when_valid_nlp_engines_then_return_default_configuration(): def test_when_nlp_engines_type_is_not_tuple_then_fail(): + """Test that nlp_engines accepts lists (not just tuples) after removing legacy validation.""" nlp_engines = [SpacyNlpEngine, StanzaNlpEngine, TransformersNlpEngine] - with pytest.raises(ValueError): - NlpEngineProvider(nlp_engines) - - + # After removing legacy validation, lists are now accepted (they work just as well as tuples) + provider = NlpEngineProvider(nlp_engines=nlp_engines) + engine = provider.create_engine() + assert isinstance(engine, SpacyNlpEngine) + def test_when_invalid_nlp_engine_types_then_fail(): + """Test that invalid nlp_engine types will fail when accessing attributes.""" nlp_engines = (1, 2, 3) - with pytest.raises(ValueError): - NlpEngineProvider(nlp_engines) + # After removing legacy validation, this fails with AttributeError when accessing .is_available + with pytest.raises(AttributeError): + NlpEngineProvider(nlp_engines=nlp_engines) + def test_when_valid_nlp_configuration_then_return_default_configuration(): @@ -276,13 +281,16 @@ def test_when_valid_nlp_configuration_then_return_default_configuration(): def test_when_nlp_configuration_is_passed_instead_of_nlp_engines_then_fail(): + """Test that passing nlp_configuration as positional argument fails.""" nlp_configuration = { "nlp_engine_name": "stanza", "models": [{"lang_code": "en", "model_name": "en"}] } - with pytest.raises(ValueError): + # This fails because nlp_configuration is passed as positional arg (interpreted as nlp_engines) + with pytest.raises(AttributeError): NlpEngineProvider(nlp_configuration) + def test_when_nlp_configuration_is_not_dict_then_fail(): @@ -327,7 +335,7 @@ def test_when_conf_file_is_empty_string_then_fail(): def test_when_conf_file_is_not_string_or_path_then_fail(): conf_file = 1 - with pytest.raises(ValueError): + with pytest.raises(TypeError): NlpEngineProvider(conf_file=conf_file) diff --git a/presidio-analyzer/tests/test_pattern.py b/presidio-analyzer/tests/test_pattern.py index a276b69613..255aa9e7b6 100644 --- a/presidio-analyzer/tests/test_pattern.py +++ b/presidio-analyzer/tests/test_pattern.py @@ -27,3 +27,49 @@ def test_when_use_from_dict_return_pattern(my_pattern, my_pattern_dict): assert expected.name == actual.name assert expected.score == actual.score assert expected.regex == actual.regex + + +def test_pattern_validation_success(): + """Test that Pattern class validates correctly with valid data.""" + pattern_data = { + "name": "US ZIP Code", + "regex": r"\b\d{5}(?:-\d{4})?\b", + "score": 0.85 + } + + pattern = Pattern.from_dict(pattern_data) + assert pattern.name == "US ZIP Code" + assert pattern.score == 0.85 + assert pattern.regex == r"\b\d{5}(?:-\d{4})?\b" + +def test_pattern_validation_invalid_regex(): + """Test that Pattern class rejects invalid regex patterns.""" + pattern_data = { + "name": "Invalid Pattern", + "regex": "[unclosed_bracket", # Invalid regex + "score": 0.5 + } + + with pytest.raises(ValueError) as exc_info: + Pattern.from_dict(pattern_data) + + +def test_pattern_validation_invalid_score_range(): + """Test that Pattern class rejects scores outside [0,1] range.""" + pattern_data = { + "name": "Invalid Score", + "regex": r"\btest\b", + "score": 1.5 # Invalid: > 1.0 + } + + with pytest.raises(ValueError) as exc_info: + Pattern.from_dict(pattern_data) + + +def test_backward_compatibility_pattern_to_dict(): + """Test that Pattern maintains backward compatibility with to_dict method.""" + pattern = Pattern(name="test", regex=r"\btest\b", score=0.5) + pattern_dict = pattern.to_dict() + + expected = {"name": "test", "regex": r"\btest\b", "score": 0.5} + assert pattern_dict == expected diff --git a/presidio-analyzer/tests/test_pattern_recognizer.py b/presidio-analyzer/tests/test_pattern_recognizer.py index b26d6cc278..2c39f05ce9 100644 --- a/presidio-analyzer/tests/test_pattern_recognizer.py +++ b/presidio-analyzer/tests/test_pattern_recognizer.py @@ -219,3 +219,282 @@ def test_global_regex_flag_deny_list_returns_right_result(global_flag, expected_ results = recognizer_ignore_case.analyze(text=text, entities=["TITLE"]) assert len(results) == expected_len + + +def test_pattern_recognizer_with_invalidate_result(): + """Test PatternRecognizer with invalidate_result returning True.""" + class InvalidatingRecognizer(PatternRecognizer): + def invalidate_result(self, pattern_text): + # Invalidate if pattern starts with '0' + return pattern_text.startswith('0') + + patterns = [Pattern(name="test_pattern", regex=r"\d{3}", score=0.8)] + recognizer = InvalidatingRecognizer( + supported_entity="TEST", + patterns=patterns, + name="InvalidatingTest", + ) + + # Test with valid pattern (doesn't start with 0) + results = recognizer.analyze("Test 123 and 456", ["TEST"]) + assert len(results) == 2 + assert all(r.score == 0.8 for r in results) + + # Test with invalidated pattern (starts with 0) + results = recognizer.analyze("Test 012 and 098", ["TEST"]) + assert len(results) == 0 # Should be filtered out due to MIN_SCORE + + +def test_pattern_recognizer_with_validate_result_false(): + """Test PatternRecognizer with validate_result returning False.""" + class ValidatingRecognizer(PatternRecognizer): + def validate_result(self, pattern_text): + # Only validate if it contains digit '5' + return '5' in pattern_text + + patterns = [Pattern(name="test_pattern", regex=r"\d{3}", score=0.5)] + recognizer = ValidatingRecognizer( + supported_entity="TEST", + patterns=patterns, + name="ValidatingTest", + ) + + # Test with valid pattern (contains 5) + results = recognizer.analyze("Test 456", ["TEST"]) + assert len(results) == 1 + assert results[0].score == 1.0 # MAX_SCORE + + # Test with invalid pattern (no 5) + results = recognizer.analyze("Test 123", ["TEST"]) + assert len(results) == 0 # Filtered due to MIN_SCORE + + +def test_pattern_recognizer_with_both_validate_and_invalidate(): + """Test PatternRecognizer with both validate and invalidate logic.""" + class BothRecognizer(PatternRecognizer): + def validate_result(self, pattern_text): + return len(pattern_text) == 3 + + def invalidate_result(self, pattern_text): + return pattern_text == "000" + + patterns = [Pattern(name="test_pattern", regex=r"\d{3}", score=0.5)] + recognizer = BothRecognizer( + supported_entity="TEST", + patterns=patterns, + name="BothTest", + ) + + # Test with valid and not invalidated + results = recognizer.analyze("Test 123", ["TEST"]) + assert len(results) == 1 + assert results[0].score == 1.0 + + # Test with invalidated + results = recognizer.analyze("Test 000", ["TEST"]) + assert len(results) == 0 + + +def test_pattern_recognizer_empty_match_skipped(): + """Test that empty regex matches are skipped.""" + patterns = [Pattern(name="test_pattern", regex=r"\d*", score=0.5)] + recognizer = PatternRecognizer( + supported_entity="TEST", + patterns=patterns, + name="EmptyMatchTest", + ) + + # This regex can match empty strings + results = recognizer.analyze("abc", ["TEST"]) + # Empty matches should be filtered out + assert len(results) == 0 + + +def test_pattern_recognizer_to_dict(): + """Test serialization of PatternRecognizer to dict.""" + patterns = [Pattern(name="p1", regex=r"\d+", score=0.8)] + deny_list = ["word1", "word2"] + context = ["context1", "context2"] + + recognizer = PatternRecognizer( + supported_entity="TEST_ENTITY", + patterns=patterns, + deny_list=deny_list, + context=context, + name="TestRecognizer", + version="1.0.0", + ) + + result_dict = recognizer.to_dict() + + assert result_dict["supported_entity"] == "TEST_ENTITY" + assert "supported_entities" not in result_dict + assert len(result_dict["patterns"]) == 2 # 1 pattern + 1 deny_list pattern + assert result_dict["deny_list"] == deny_list + assert result_dict["context"] == context + assert result_dict["name"] == "TestRecognizer" + assert result_dict["version"] == "1.0.0" + + +def test_pattern_recognizer_from_dict_with_both_supported_entity_and_entities(): + """Test from_dict raises error when both supported_entity and supported_entities present.""" + recognizer_dict = { + "supported_entity": "ENTITY_A", + "supported_entities": ["ENTITY_B"], + "patterns": [{"name": "p1", "score": 0.5, "regex": r"\d+"}], + } + + with pytest.raises(ValueError, match="Both 'supported_entity' and 'supported_entities'"): + PatternRecognizer.from_dict(recognizer_dict) + + +def test_pattern_recognizer_from_dict_with_supported_entities_only(): + """Test from_dict uses first element of supported_entities.""" + recognizer_dict = { + "supported_entities": ["ENTITY_A", "ENTITY_B"], + "patterns": [{"name": "p1", "score": 0.5, "regex": r"\d+"}], + } + + recognizer = PatternRecognizer.from_dict(recognizer_dict) + assert recognizer.supported_entities == ["ENTITY_A"] + + +def test_pattern_recognizer_from_dict_with_empty_supported_entities(): + """Test from_dict with empty supported_entities list.""" + recognizer_dict = { + "supported_entities": [], + "patterns": [{"name": "p1", "score": 0.5, "regex": r"\d+"}], + } + + # Should raise TypeError because supported_entity parameter is missing + with pytest.raises(TypeError): + PatternRecognizer.from_dict(recognizer_dict) + + +def test_pattern_recognizer_analyze_with_custom_regex_flags(): + """Test analyze with custom regex flags.""" + patterns = [Pattern(name="test_pattern", regex=r"test", score=0.8)] + recognizer = PatternRecognizer( + supported_entity="TEST", + patterns=patterns, + name="FlagTest", + global_regex_flags=0, # No flags by default + ) + + # Should not match with default flags (case-sensitive) + results = recognizer.analyze("TEST", ["TEST"]) + assert len(results) == 0 + + # Should match with IGNORECASE flag + results = recognizer.analyze("TEST", ["TEST"], regex_flags=re.IGNORECASE) + assert len(results) == 1 + + +def test_pattern_recognizer_multiple_patterns(): + """Test recognizer with multiple patterns.""" + patterns = [ + Pattern(name="pattern1", regex=r"\b\d{3}\b", score=0.6), + Pattern(name="pattern2", regex=r"\b[A-Z]{4}\b", score=0.7), + ] + recognizer = PatternRecognizer( + supported_entity="TEST", + patterns=patterns, + name="MultiPatternTest", + global_regex_flags=re.DOTALL | re.MULTILINE + ) + + results = recognizer.analyze("Number 123 and CAPS word", ["TEST"]) + # Should find exactly 2 results (digits and capitals) + assert len(results) == 2 + + # Check that both patterns were matched with correct scores + scores = sorted([r.score for r in results]) + assert scores == [0.6, 0.7] + + +def test_pattern_recognizer_build_regex_explanation(): + """Test build_regex_explanation static method.""" + explanation = PatternRecognizer.build_regex_explanation( + recognizer_name="TestRecognizer", + pattern_name="TestPattern", + pattern=r"\d+", + original_score=0.85, + validation_result=True, + regex_flags=re.IGNORECASE, + ) + + assert explanation.recognizer == "TestRecognizer" + assert explanation.pattern_name == "TestPattern" + assert explanation.pattern == r"\d+" + assert explanation.original_score == 0.85 + assert explanation.validation_result == True + assert explanation.regex_flags == re.IGNORECASE + assert "TestRecognizer" in explanation.textual_explanation + assert "TestPattern" in explanation.textual_explanation + + +def test_pattern_recognizer_load_method(): + """Test that load method can be called without error.""" + recognizer = PatternRecognizer( + supported_entity="TEST", + deny_list=["test"], + ) + + # load() should not raise any exception + recognizer.load() + + +def test_pattern_recognizer_with_zero_global_regex_flags(): + """Test PatternRecognizer with 0 as global_regex_flags.""" + patterns = [Pattern(name="test", regex=r"test", score=0.5)] + recognizer = PatternRecognizer( + supported_entity="TEST", + patterns=patterns, + global_regex_flags=0, + ) + + # Should work with 0 flags (case-sensitive) + results = recognizer.analyze("test", ["TEST"]) + assert len(results) == 1 + + # Should not match different case + results = recognizer.analyze("TEST", ["TEST"]) + assert len(results) == 0 + + +def test_pattern_recognizer_recompiles_regex_on_flag_change(): + """Test that regex is recompiled when flags change.""" + patterns = [Pattern(name="test", regex=r"test", score=0.5)] + recognizer = PatternRecognizer( + supported_entity="TEST", + patterns=patterns, + global_regex_flags=0, + ) + + # First analysis with no flags + results = recognizer.analyze("TEST", ["TEST"], regex_flags=0) + assert len(results) == 0 + + # Second analysis with IGNORECASE flag (should recompile) + results = recognizer.analyze("TEST", ["TEST"], regex_flags=re.IGNORECASE) + assert len(results) == 1 + + +def test_pattern_recognizer_recognizer_metadata(): + """Test that recognition_metadata is properly set in results.""" + patterns = [Pattern(name="test", regex=r"\d+", score=0.5)] + recognizer = PatternRecognizer( + supported_entity="TEST", + patterns=patterns, + name="MetadataTest", + ) + + results = recognizer.analyze("Test 123", ["TEST"]) + assert len(results) == 1 + + metadata = results[0].recognition_metadata + assert "recognizer_name" in metadata + assert metadata["recognizer_name"] == "MetadataTest" + assert "recognizer_identifier" in metadata + + diff --git a/presidio-analyzer/tests/test_recognizer_registry_provider.py b/presidio-analyzer/tests/test_recognizer_registry_provider.py index 77ba7d4d96..17771a1d3e 100644 --- a/presidio-analyzer/tests/test_recognizer_registry_provider.py +++ b/presidio-analyzer/tests/test_recognizer_registry_provider.py @@ -3,7 +3,6 @@ from pathlib import Path from typing import List from inspect import signature - from presidio_analyzer.predefined_recognizers import SpacyRecognizer from presidio_analyzer.recognizer_registry import RecognizerRegistryProvider from presidio_analyzer.recognizer_registry.recognizers_loader_utils import RecognizerConfigurationLoader @@ -37,8 +36,8 @@ def test_recognizer_registry_provider_configuration_file(): assert [recognizer.supported_language for recognizer in recognizer_registry.recognizers if recognizer.name == "ItFiscalCodeRecognizer"] == ["en", "es"] assert [recognizer.supported_language for recognizer in recognizer_registry.recognizers if recognizer.name == "CreditCardRecognizer"] == ["en"] assert [recognizer.supported_language for recognizer in recognizer_registry.recognizers if recognizer.name == "ExampleCustomRecognizer"] == ["en", "es"] - snpanish_recognizer = [recognizer for recognizer in recognizer_registry.recognizers if recognizer.name == "ExampleCustomRecognizer" and recognizer.supported_language == "es"][0] - assert snpanish_recognizer.context == ["tarjeta", "credito"] + spanish_recognizer = [recognizer for recognizer in recognizer_registry.recognizers if recognizer.name == "ExampleCustomRecognizer" and recognizer.supported_language == "es"][0] + assert spanish_recognizer.context == ["tarjeta", "credito"] def test_recognizer_registry_provider_configuration_file_load_predefined(mandatory_recognizers): @@ -73,44 +72,15 @@ def test_recognizer_registry_provider_corrupt_conf_file_fail(mandatory_recognize def test_recognizer_registry_provider_conf_file_valid_missing_keys_fail(): + """Test that a config file with invalid keys (no mandatory keys) raises an error.""" this_path = Path(__file__).parent.absolute() test_yaml = Path(this_path, "conf/recognizer_configuration_missing_keys.yaml") - with pytest.raises(ValueError): + # Config file with no mandatory keys should raise ValueError + with pytest.raises(ValueError, match="does not contain any of the mandatory keys"): RecognizerRegistryProvider(conf_file=test_yaml) -# def test_recognizer_registry_provider_with_registry_configuration(): -# registry_configuration = { -# "supported_languages": ["de", "es", "en"], -# "recognizers": [ -# { -# "name": "Zip code Recognizer", -# "supported_language": "en", -# "patterns": [ -# { -# "name": "zip code (weak)", -# "regex": "(\\b\\d{5}(?:\\-\\d{4})?\\b)", -# "score": 0.01, -# } -# ], -# "context": ["zip", "code"], -# "supported_entity": "ZIP", -# } -# ] -# } - - # provider = RecognizerRegistryProvider(registry_configuration=registry_configuration) - # recognizer_registry = provider.create_recognizer_registry() - # assert recognizer_registry.supported_languages == ["de", "es", "en"] - # assert recognizer_registry.global_regex_flags == re.DOTALL | re.MULTILINE | re.IGNORECASE - # assert len(recognizer_registry.recognizers) == 1 - # recognizer = recognizer_registry.recognizers[0] - # assert recognizer.name == "Zip code Recognizer" - # assert recognizer.supported_language == "en" - # assert recognizer.supported_entities == ["ZIP"] - # assert len(recognizer.patterns) == 1 - def test_recognizer_registry_provider_when_conf_file_and_registry_configuration_fail(): this_path = Path(__file__).parent.absolute() @@ -129,7 +99,7 @@ def test_recognizer_provider_with_minimal_creates_empty_registry(): provider = RecognizerRegistryProvider(conf_file=minimal_yaml) registry = provider.create_recognizer_registry() - assert len(registry.recognizers) == 0 + assert len(registry.recognizers) == 1 def test_recognizer_provider_with_nlp_reco_only_creates_nlp_recognizer(): @@ -149,4 +119,126 @@ def test_default_attributes_equal_recognizer_registry_signature(): registry_provider = RecognizerRegistryProvider() provider_fields = set(RecognizerConfigurationLoader.mandatory_keys) - assert registry_fields == provider_fields \ No newline at end of file + assert registry_fields == provider_fields + + +def test_recognizer_registry_provider_missing_language_config_raises(): + """ + Test that a recognizer configuration without language info gets the default languages. + """ + from presidio_analyzer.recognizer_registry.recognizer_registry_provider import RecognizerRegistryProvider + # Configuration with no supported_languages and no recognizer language + registry_configuration = { + "recognizers": [ + { + "name": "CustomRecognizer", + "type": "custom", + "supported_entity": "CUSTOM_ENTITY", + "patterns": [ + {"name": "custom", "regex": "test", "score": 0.5} + ], + # No supported_language or supported_languages + } + ] + } + # When registry_configuration is passed, it gets merged with defaults + # so supported_languages gets filled in and recognizers get created for default languages + provider = RecognizerRegistryProvider(registry_configuration=registry_configuration) + # Verify that defaults were applied + assert provider.configuration.get("supported_languages") is not None + registry = provider.create_recognizer_registry() + # Verify registry was created successfully with default language + assert len(registry.recognizers) > 0 + + +# Tests for missing required and optional fields in YAML configuration + +def test_missing_recognizers_raises_exception(): + """Test that missing recognizers raises an exception.""" + this_path = Path(__file__).parent.absolute() + conf_file = Path(this_path, "conf/missing_recognizers.yaml") + + with pytest.raises(ValueError) as exc_info: + RecognizerRegistryProvider(conf_file=conf_file) + + assert "recognizers" in str(exc_info.value) + assert "mandatory" in str(exc_info.value).lower() + + +def test_missing_global_regex_flags_uses_default(): + """Test that missing global_regex_flags uses default value without error.""" + this_path = Path(__file__).parent.absolute() + conf_file = Path(this_path, "conf/missing_global_regex_flags.yaml") + + # Should not raise an exception + provider = RecognizerRegistryProvider(conf_file=conf_file) + registry = provider.create_recognizer_registry() + + # Check that default value was used (26 = re.DOTALL | re.MULTILINE | re.IGNORECASE) + assert registry.global_regex_flags == 26 + assert registry.supported_languages == ["en"] + + +def test_valid_configuration_passes(): + """Test that a valid configuration passes validation.""" + from presidio_analyzer.input_validation import ConfigurationValidator + + config = { + "supported_languages": ["en", "es"], + "recognizers": ["CreditCardRecognizer", "EmailRecognizer"], + "global_regex_flags": 26, + } + + validated = ConfigurationValidator.validate_recognizer_registry_configuration(config) + + assert validated is not None + assert validated["supported_languages"] == ["en", "es"] + assert validated["global_regex_flags"] == 26 + + +def test_valid_configuration_without_global_regex_flags(): + """Test that configuration without global_regex_flags uses default without error.""" + from presidio_analyzer.input_validation import ConfigurationValidator + + config = { + "supported_languages": ["en"], + "recognizers": ["CreditCardRecognizer"], + } + + # Should not raise an exception + validated = ConfigurationValidator.validate_recognizer_registry_configuration(config) + + # Check default value was set + assert validated["global_regex_flags"] == 26 + assert validated["supported_languages"] == ["en"] + + +def test_recognizers_none_raises_exception(): + """Test that recognizers explicitly set to None raises an exception.""" + from presidio_analyzer.input_validation import ConfigurationValidator + + config = { + "supported_languages": ["en"], + "recognizers": None, + "global_regex_flags": 26, + } + + with pytest.raises(ValueError) as exc_info: + ConfigurationValidator.validate_recognizer_registry_configuration(config) + + +def test_direct_validation_with_missing_global_regex_flags(): + """Test direct validation without global_regex_flags succeeds with default.""" + from presidio_analyzer.input_validation import ConfigurationValidator + + config = { + "supported_languages": ["en"], + "recognizers": ["CreditCardRecognizer"], + } + + # Should not raise an exception + validated = ConfigurationValidator.validate_recognizer_registry_configuration(config) + + # Verify default value and successful creation + assert validated["global_regex_flags"] == 26 + assert validated["supported_languages"] == ["en"] diff --git a/presidio-analyzer/tests/test_yaml_recognizer_models.py b/presidio-analyzer/tests/test_yaml_recognizer_models.py new file mode 100644 index 0000000000..425c49105f --- /dev/null +++ b/presidio-analyzer/tests/test_yaml_recognizer_models.py @@ -0,0 +1,599 @@ +"""Tests for YAML recognizer configuration models.""" + +import pytest +from pydantic import ValidationError + +from presidio_analyzer.input_validation.yaml_recognizer_models import ( + BaseRecognizerConfig, + CustomRecognizerConfig, + LanguageContextConfig, + PredefinedRecognizerConfig, + RecognizerRegistryConfig, +) + + +def test_language_context_config_valid(): + """Test LanguageContextConfig validates correctly.""" + lang_config = LanguageContextConfig( + language="en", + context=["credit", "card"] + ) + assert lang_config.language == "en" + assert lang_config.context == ["credit", "card"] + + +def test_language_context_config_valid_with_region(): + """Test LanguageContextConfig with region code.""" + lang_config = LanguageContextConfig( + language="en-US", + context=["social", "security"] + ) + assert lang_config.language == "en-US" + assert lang_config.context == ["social", "security"] + + +def test_language_context_config_no_context(): + """Test LanguageContextConfig without context.""" + lang_config = LanguageContextConfig(language="es") + assert lang_config.language == "es" + assert lang_config.context is None + + +def test_language_context_config_invalid_language(): + """Test LanguageContextConfig rejects invalid language codes.""" + with pytest.raises(ValidationError) as exc_info: + LanguageContextConfig(language="invalid") + assert "Invalid language code format" in str(exc_info.value) + + +def test_language_context_config_invalid_format(): + """Test various invalid language formats.""" + invalid_languages = ["e", "eng", "EN", "en-us", "en-USA", "123", ""] + + for lang in invalid_languages: + with pytest.raises(ValidationError): + LanguageContextConfig(language=lang) + + +def test_base_recognizer_config_minimal(): + """Test minimal valid configuration.""" + config = BaseRecognizerConfig(name="test_recognizer") + assert config.name == "test_recognizer" + assert config.enabled is True + assert config.type == "predefined" + + +def test_base_recognizer_config_full(): + """Test full configuration with all fields.""" + config = BaseRecognizerConfig( + name="test_recognizer", + enabled=False, + type="custom", + supported_language="en", + context=["test", "context"], + supported_entity="TEST_ENTITY" + ) + assert config.name == "test_recognizer" + assert config.enabled is False + assert config.type == "custom" + assert config.supported_language == "en" # Preserved as-is + assert config.supported_languages is None + assert config.context == ["test", "context"] + assert config.supported_entity == "TEST_ENTITY" # Preserved as-is + assert config.supported_entities is None + + +def test_language_fields_preserved(): + """Test that supported_language is preserved as-is (not normalized).""" + config = BaseRecognizerConfig( + name="test", + supported_language="en" + ) + assert config.supported_language == "en" + assert config.supported_languages is None + + +def test_entity_fields_preserved(): + """Test that supported_entity is preserved as-is (not normalized).""" + config = BaseRecognizerConfig( + name="test", + supported_entity="PERSON" + ) + assert config.supported_entity == "PERSON" + assert config.supported_entities is None + + +def test_cannot_specify_both_language_formats(): + """Test that specifying both language formats raises error.""" + with pytest.raises(ValidationError) as exc_info: + BaseRecognizerConfig( + name="test", + supported_language="en", + supported_languages=["es", "fr"] + ) + assert "Cannot specify both 'supported_language' and 'supported_languages'" in str(exc_info.value) + + +def test_cannot_specify_both_entity_formats(): + """Test that specifying both entity formats raises error.""" + with pytest.raises(ValidationError) as exc_info: + BaseRecognizerConfig( + name="test", + supported_entity="PERSON", + supported_entities=["LOCATION", "ORG"] + ) + assert "has both 'supported_entity' and 'supported_entities' specified" in str(exc_info.value) + + +def test_invalid_single_language_format(): + """Test validation of single language format.""" + with pytest.raises(ValidationError): + BaseRecognizerConfig( + name="test", + supported_language="invalid" + ) + + +def test_context_with_multiple_languages_error(): + """Test that global context with multiple languages raises error.""" + with pytest.raises(ValidationError) as exc_info: + BaseRecognizerConfig( + name="test", + supported_languages=["en", "es"], + context=["global", "context"] + ) + assert "Global context can only be used with a single language" in str(exc_info.value) + + +def test_context_with_single_language_valid(): + """Test that global context with single language is valid.""" + config = BaseRecognizerConfig( + name="test", + supported_languages=["en"], + context=["global", "context"] + ) + assert config.context == ["global", "context"] + + +def test_predefined_recognizer_config_defaults(): + """Test predefined recognizer with defaults.""" + config = PredefinedRecognizerConfig(name="CreditCardRecognizer") + assert config.name == "CreditCardRecognizer" + assert config.type == "predefined" + assert config.enabled is True + + +def test_predefined_recognizer_config_with_language(): + """Test predefined recognizer with language specification.""" + config = PredefinedRecognizerConfig( + name="CreditCardRecognizer", + supported_language="en" + ) + assert config.supported_language == "en" + assert config.supported_languages is None + + +def test_custom_recognizer_config_with_patterns(): + """Test custom recognizer with patterns.""" + patterns = [ + { + "name": "test_pattern", + "regex": r"\b\d{4}-\d{4}-\d{4}-\d{4}\b", + "score": 0.8 + } + ] + config = CustomRecognizerConfig( + name="custom_test", + supported_entity="CUSTOM_ENTITY", + patterns=patterns + ) + assert config.name == "custom_test" + assert config.type == "custom" + assert config.supported_entity == "CUSTOM_ENTITY" + assert config.supported_entities is None + assert config.patterns == patterns + + +def test_custom_recognizer_config_with_deny_list(): + """Test custom recognizer with deny list only.""" + config = CustomRecognizerConfig( + name="custom_test", + supported_entity="CUSTOM_ENTITY", + deny_list=["exclude", "this"], + deny_list_score=0.1 + ) + assert config.deny_list == ["exclude", "this"] + assert config.deny_list_score == 0.1 + + +def test_custom_recognizer_config_invalid_patterns_not_list(): + """Test that patterns must be a list.""" + with pytest.raises(ValidationError) as exc_info: + CustomRecognizerConfig( + name="test", + supported_entity="TEST", + patterns="not a list" + ) + + +def test_custom_recognizer_config_invalid_pattern_not_dict(): + """Test that each pattern must be a dict.""" + with pytest.raises(ValidationError) as exc_info: + CustomRecognizerConfig( + name="test", + supported_entity="TEST", + patterns=["not a dict"] + ) + + +def test_custom_recognizer_config_pattern_missing_fields(): + """Test that patterns must have required fields.""" + required_fields = ["name", "regex", "score"] + + for field in required_fields: + pattern = {"name": "test", "regex": r"\d+", "score": 0.5} + del pattern[field] + + with pytest.raises(ValidationError) as exc_info: + CustomRecognizerConfig( + name="test", + supported_entity="TEST", + patterns=[pattern] + ) + + +def test_custom_recognizer_config_invalid_score_type(): + """Test that pattern score must be float.""" + pattern = { + "name": "test", + "regex": r"\d+", + "score": "not a float" + } + with pytest.raises(ValidationError) as exc_info: + CustomRecognizerConfig( + name="test", + supported_entity="TEST", + patterns=[pattern] + ) + + +def test_custom_recognizer_config_invalid_score_range(): + """Test that pattern score must be between 0 and 1.""" + invalid_scores = [-0.1, 1.1, 2.0] + + for score in invalid_scores: + pattern = { + "name": "test", + "regex": r"\d+", + "score": score + } + with pytest.raises(ValidationError) as exc_info: + CustomRecognizerConfig( + name="test", + supported_entity="TEST", + patterns=[pattern] + ) + + +def test_custom_recognizer_config_no_patterns_or_deny_list(): + """Test that custom recognizer must have patterns or deny_list.""" + with pytest.raises(ValidationError) as exc_info: + CustomRecognizerConfig( + name="test", + supported_entity="TEST" + ) + + +def test_custom_recognizer_config_invalid_deny_list_score(): + """Test deny_list_score validation.""" + with pytest.raises(ValidationError): + CustomRecognizerConfig( + name="test", + supported_entity="TEST", + deny_list=["test"], + deny_list_score=1.5 # Invalid: > 1.0 + ) + + with pytest.raises(ValidationError): + CustomRecognizerConfig( + name="test", + supported_entity="TEST", + deny_list=["test"], + deny_list_score=-0.1 # Invalid: < 0.0 + ) + + +def test_recognizer_registry_config_defaults(): + """Test registry config with defaults (requires at least one recognizer).""" + config = RecognizerRegistryConfig(recognizers=["CreditCardRecognizer"]) + assert config.supported_languages is None + assert config.global_regex_flags == 26 + assert len(config.recognizers) == 1 + + +def test_recognizer_registry_config_valid_languages(): + """Test registry with valid languages.""" + config = RecognizerRegistryConfig( + supported_languages=["en", "es", "fr-CA"], + recognizers=["CreditCardRecognizer"] + ) + assert config.supported_languages == ["en", "es", "fr-CA"] + + +def test_recognizer_registry_config_invalid_language(): + """Test registry with invalid language codes.""" + with pytest.raises(ValidationError): + RecognizerRegistryConfig( + supported_languages=["en", "invalid", "es"], + recognizers=["CreditCardRecognizer"] + ) + + +def test_recognizer_registry_config_empty_languages(): + """Test registry with empty languages list.""" + config = RecognizerRegistryConfig( + supported_languages=[], + recognizers=["CreditCardRecognizer"] + ) + assert config.supported_languages == [] + + +def test_recognizer_registry_config_empty_recognizers(): + """Test that empty recognizers list raises a validation error.""" + with pytest.raises(ValidationError) as exc_info: + RecognizerRegistryConfig( + recognizers=[], + global_regex_flags=26 + ) + assert "empty recognizers list" in str(exc_info.value).lower() + + +def test_recognizer_registry_config_missing_recognizers(): + """Test that missing recognizers field raises a validation error.""" + with pytest.raises(ValidationError) as exc_info: + RecognizerRegistryConfig( + supported_languages=["en"], + global_regex_flags=26 + ) + assert "empty recognizers list" in str(exc_info.value).lower() + + +def test_recognizer_registry_config_string_recognizers(): + """Test registry with string recognizers.""" + config = RecognizerRegistryConfig( + recognizers=["credit_card", "email", "phone_number"] + ) + assert len(config.recognizers) == 3 + assert all(isinstance(r, str) for r in config.recognizers) + + +def test_recognizer_registry_config_mixed_recognizers(): + """Test registry with mixed recognizer types and missing languages should fail.""" + custom_config = { + "name": "custom_test", + "type": "custom", + "supported_entity": "TEST", + "patterns": [{"name": "test", "regex": r"\d+", "score": 0.5}] + } + + with pytest.raises(ValidationError) as exc_info: + RecognizerRegistryConfig( + recognizers=[ + "credit_card", # string predefined + {"name": "UrlRecognizer", "type": "predefined"}, # predefined + custom_config # custom without languages should trigger error + ] + ) + assert "Language configuration missing" in str(exc_info.value) + + +def test_recognizer_registry_config_only_predefined_no_languages(): + """Predefined recognizers without languages should be allowed (use defaults).""" + config = RecognizerRegistryConfig( + recognizers=[ + "credit_card", + {"name": "UrlRecognizer", "type": "predefined"}, + ] + ) + assert len(config.recognizers) == 2 + assert isinstance(config.recognizers[0], str) + assert isinstance(config.recognizers[1], PredefinedRecognizerConfig) + + +def test_recognizer_registry_config_auto_detect_type(): + """Test auto-detection of recognizer type based on patterns and deny_list.""" + # Should be detected as custom due to patterns + custom_with_patterns_config = { + "name": "auto_custom_patterns", + "supported_entity": "TEST", + "supported_language": "en", + "patterns": [{"name": "test", "regex": r"\d+", "score": 0.5}] + } + + # Should be detected as custom due to deny_list + custom_with_deny_list_config = { + "name": "auto_custom_deny", + "supported_entity": "TEST", + "supported_language": "en", + "deny_list": ["exclude_this"] + } + + # Should be detected as predefined (no patterns or deny_list) + predefined_config = { + "name": "UrlRecognizer", + "enabled": True + } + + config = RecognizerRegistryConfig( + supported_languages=["en"], # Add global language to satisfy new validation + recognizers=[custom_with_patterns_config, custom_with_deny_list_config, predefined_config] + ) + + assert isinstance(config.recognizers[0], CustomRecognizerConfig) + assert config.recognizers[0].type == "custom" + assert isinstance(config.recognizers[1], CustomRecognizerConfig) + assert config.recognizers[1].type == "custom" + assert isinstance(config.recognizers[2], PredefinedRecognizerConfig) + assert config.recognizers[2].type == "predefined" + + + +def test_complete_registry_scenario(): + """Test a complete registry configuration scenario.""" + registry_config = { + "supported_languages": ["en", "es"], + "recognizers": [ + "credit_card", # String recognizer (kept as string) + { + "name": "EmailRecognizer", + "type": "predefined", + "enabled": True + }, + { + "name": "custom_pattern", + "type": "custom", + "supported_entity": "CUSTOM_ID", + "supported_language": "en", + "patterns": [ + { + "name": "id_pattern", + "regex": r"ID-\d{6}", + "score": 0.9 + } + ] + } + ] + } + + config = RecognizerRegistryConfig(**registry_config) + assert len(config.recognizers) == 3 + assert isinstance(config.recognizers[0], str) + assert isinstance(config.recognizers[1], PredefinedRecognizerConfig) + assert isinstance(config.recognizers[2], CustomRecognizerConfig) + + + +def test_error_handling_cascade(): + """Test that validation errors are properly cascaded.""" + # This should fail at the CustomRecognizerConfig level + with pytest.raises(ValidationError) as exc_info: + RecognizerRegistryConfig( + recognizers=[ + { + "name": "invalid_custom", + "type": "custom", + "supported_entity": "TEST", + "supported_language": "en", # Add language to avoid that error + "patterns": [ + { + "name": "test", + "regex": r"\d+", + "score": 2.0 # Invalid score > 1.0 + } + ] + } + ] + ) + assert "Pattern score should be between 0 and 1" in str(exc_info.value) + + +def test_predefined_recognizer_config_valid_recognizer(): + """Test predefined recognizer with valid recognizer name.""" + # Test with a common recognizer that should exist + config = PredefinedRecognizerConfig(name="CreditCardRecognizer") + assert config.name == "CreditCardRecognizer" + assert config.type == "predefined" + + +def test_predefined_recognizer_config_invalid_recognizer(): + """Test predefined recognizer with invalid recognizer name.""" + with pytest.raises(ValidationError) as exc_info: + PredefinedRecognizerConfig(name="NonExistentRecognizer") + + +def test_predefined_recognizer_config_case_sensitive(): + """Test that recognizer names are case sensitive.""" + with pytest.raises(ValidationError) as exc_info: + PredefinedRecognizerConfig(name="creditcardrecognizer") # lowercase + + error_message = str(exc_info.value) + assert "Predefined recognizer 'creditcardrecognizer' not found" in error_message + + +def test_custom_recognizer_config_predefined_name_error(): + """Test that using a predefined recognizer name for custom recognizer raises error.""" + with pytest.raises(ValidationError) as exc_info: + CustomRecognizerConfig( + name="CreditCardRecognizer", # This is a predefined recognizer + type="custom", + supported_entity="CREDIT_CARD", + patterns=[{"name": "test", "regex": r"\d+", "score": 0.5}] + ) + + error_message = str(exc_info.value) + assert "Recognizer 'CreditCardRecognizer' conflicts with a predefined" in error_message + assert "Either use type: 'predefined' or choose a different name" in error_message + + +def test_custom_recognizer_config_predefined_name_error_without_required_fields(): + """Test that predefined name conflict is caught even when missing required fields.""" + with pytest.raises(ValidationError) as exc_info: + CustomRecognizerConfig( + name="UrlRecognizer", # This is a predefined recognizer + type="custom" + # Intentionally missing supported_entity, patterns, and deny_list + ) + + error_message = str(exc_info.value) + assert "conflicts with a predefined recognizer" in error_message or \ + "is a predefined recognizer but is marked as 'custom'" in error_message + + +def test_custom_recognizer_config_unique_name_valid(): + """Test that custom recognizers with unique names are valid.""" + config = CustomRecognizerConfig( + name="MyCustomRecognizer", # This should not exist as predefined + type="custom", + supported_entity="CUSTOM_ENTITY", + patterns=[{"name": "test", "regex": r"\d+", "score": 0.5}] + ) + assert config.name == "MyCustomRecognizer" + assert config.type == "custom" + + +def test_custom_recognizer_config_predefined_name_validation_with_import_error(): + """Test that custom recognizers with unique names (not predefined) are valid. + + This test verifies that a custom recognizer can use a name that doesn't + conflict with any predefined recognizers. + """ + config = CustomRecognizerConfig( + name="SomeUniqueRecognizer", + type="custom", + supported_entity="TEST", + patterns=[{"name": "test", "regex": r"\d+", "score": 0.5}] + ) + assert config.name == "SomeUniqueRecognizer" + assert config.type == "custom" + + +def test_custom_recognizer_with_language_no_global_languages(): + """Custom recognizer specifying its own language should pass without global languages.""" + registry_config = { + "recognizers": [ + { + "name": "my_custom_with_lang", + "type": "custom", + "supported_entity": "TEST", + "supported_language": "en", + "patterns": [ + {"name": "p", "regex": r"\d+", "score": 0.5} + ] + } + ] + } + config = RecognizerRegistryConfig(**registry_config) + assert len(config.recognizers) == 1 + assert isinstance(config.recognizers[0], CustomRecognizerConfig) + assert config.recognizers[0].supported_language == "en" + assert config.recognizers[0].supported_languages is None