Skip to content
14 changes: 13 additions & 1 deletion presidio-analyzer/presidio_analyzer/analyzer_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import os
from collections import Counter
from typing import List, Optional

Expand All @@ -22,6 +23,7 @@

logger = logging.getLogger("presidio-analyzer")

REGEX_TIMEOUT_SECONDS = int(os.environ.get("REGEX_TIMEOUT_SECONDS", 60))

class AnalyzerEngine:
"""
Expand Down Expand Up @@ -371,7 +373,17 @@ def _remove_allow_list(
word = text[result.start : result.end]

# if the word is not specified to be allowed, keep in the PII entities
if not re_compiled.search(word):
try:
if not re_compiled.search(word, timeout=REGEX_TIMEOUT_SECONDS):
new_results.append(result)
except TimeoutError:
logger.warning(
"Allow list regex timed out after %s seconds"
" (word length: %d), keeping result.",
REGEX_TIMEOUT_SECONDS,
len(word),
exc_info=True,
)
new_results.append(result)
elif allow_list_match == "exact":
for result in results:
Expand Down
109 changes: 61 additions & 48 deletions presidio-analyzer/presidio_analyzer/pattern_recognizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import logging
import os
from typing import TYPE_CHECKING, Dict, List, Optional

import regex as re
Expand All @@ -17,6 +18,8 @@

logger = logging.getLogger("presidio-analyzer")

REGEX_TIMEOUT_SECONDS = int(os.environ.get("REGEX_TIMEOUT_SECONDS", 60))


class PatternRecognizer(LocalRecognizer):
"""
Expand Down Expand Up @@ -195,60 +198,70 @@ def __analyze_patterns(
pattern.compiled_with_flags = flags
pattern.compiled_regex = re.compile(pattern.regex, flags=flags)

matches = pattern.compiled_regex.finditer(text)
match_time = datetime.datetime.now() - match_start_time
logger.debug(
"--- match_time[%s]: %.6f seconds",
pattern.name,
match_time.total_seconds(),
)

for match in matches:
start, end = match.span()
current_match = text[start:end]

# Skip empty results
if current_match == "":
continue

score = pattern.score

validation_result = self.validate_result(current_match)
description = self.build_regex_explanation(
self.name,
pattern.name,
pattern.regex,
score,
validation_result,
flags,
try:
matches = pattern.compiled_regex.finditer(
text, timeout=REGEX_TIMEOUT_SECONDS
)
pattern_result = RecognizerResult(
entity_type=self.supported_entities[0],
start=start,
end=end,
score=score,
analysis_explanation=description,
recognition_metadata={
RecognizerResult.RECOGNIZER_NAME_KEY: self.name,
RecognizerResult.RECOGNIZER_IDENTIFIER_KEY: self.id,
},
match_time = datetime.datetime.now() - match_start_time
logger.debug(
"--- match_time[%s]: %.6f seconds",
pattern.name,
match_time.total_seconds(),
)

if validation_result is not None:
if validation_result:
pattern_result.score = EntityRecognizer.MAX_SCORE
else:
for match in matches:
start, end = match.span()
current_match = text[start:end]

# Skip empty results
if current_match == "":
continue

score = pattern.score

validation_result = self.validate_result(current_match)
description = self.build_regex_explanation(
self.name,
pattern.name,
pattern.regex,
score,
validation_result,
flags,
)
pattern_result = RecognizerResult(
entity_type=self.supported_entities[0],
start=start,
end=end,
score=score,
analysis_explanation=description,
recognition_metadata={
RecognizerResult.RECOGNIZER_NAME_KEY: self.name,
RecognizerResult.RECOGNIZER_IDENTIFIER_KEY: self.id,
},
)

if validation_result is not None:
if validation_result:
pattern_result.score = EntityRecognizer.MAX_SCORE
else:
pattern_result.score = EntityRecognizer.MIN_SCORE

invalidation_result = self.invalidate_result(current_match)
if invalidation_result is not None and invalidation_result:
pattern_result.score = EntityRecognizer.MIN_SCORE

invalidation_result = self.invalidate_result(current_match)
if invalidation_result is not None and invalidation_result:
pattern_result.score = EntityRecognizer.MIN_SCORE

if pattern_result.score > EntityRecognizer.MIN_SCORE:
results.append(pattern_result)
if pattern_result.score > EntityRecognizer.MIN_SCORE:
results.append(pattern_result)

# Update analysis explanation score following validation or invalidation
description.score = pattern_result.score
# Update analysis explanation score after validation or invalidation
description.score = pattern_result.score
except TimeoutError:
logger.warning(
"Regex pattern '%s' timed out after %s seconds, skipping.",
pattern.name,
REGEX_TIMEOUT_SECONDS,
exc_info=True,
)

results = EntityRecognizer.remove_duplicates(results)
return results
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
import string
from typing import Dict, List, Optional, Tuple

Expand All @@ -19,6 +20,7 @@

logger = logging.getLogger("presidio-analyzer")

REGEX_TIMEOUT_SECONDS = int(os.environ.get("REGEX_TIMEOUT_SECONDS", 60))

class IbanRecognizer(PatternRecognizer):
"""
Expand Down Expand Up @@ -144,54 +146,64 @@ def __analyze_patterns(self, text: str, flags: int = None):
flags = flags if flags else self.global_regex_flags
results = []
for pattern in self.patterns:
matches = re.finditer(pattern.regex, text, flags=flags)

for match in matches:
for grp_num in reversed(range(1, len(match.groups()) + 1)):
start = match.span(0)[0]
end = (
match.span(grp_num)[1]
if match.span(grp_num)[1] > 0
else match.span(0)[1]
)
current_match = text[start:end]

# Skip empty results
if current_match == "":
continue

score = pattern.score

validation_result = self.validate_result(current_match)
description = PatternRecognizer.build_regex_explanation(
self.name,
pattern.name,
pattern.regex,
score,
validation_result,
flags,
)
pattern_result = RecognizerResult(
entity_type=self.supported_entities[0],
start=start,
end=end,
score=score,
analysis_explanation=description,
recognition_metadata={
RecognizerResult.RECOGNIZER_NAME_KEY: self.name,
RecognizerResult.RECOGNIZER_IDENTIFIER_KEY: self.id,
},
)

if validation_result is not None:
if validation_result:
pattern_result.score = EntityRecognizer.MAX_SCORE
else:
pattern_result.score = EntityRecognizer.MIN_SCORE

if pattern_result.score > EntityRecognizer.MIN_SCORE:
results.append(pattern_result)
break
try:
matches = re.finditer(
pattern.regex, text, flags=flags, timeout=REGEX_TIMEOUT_SECONDS
)

for match in matches:
for grp_num in reversed(range(1, len(match.groups()) + 1)):
start = match.span(0)[0]
end = (
match.span(grp_num)[1]
if match.span(grp_num)[1] > 0
else match.span(0)[1]
)
current_match = text[start:end]

# Skip empty results
if current_match == "":
continue

score = pattern.score

validation_result = self.validate_result(current_match)
description = PatternRecognizer.build_regex_explanation(
self.name,
pattern.name,
pattern.regex,
score,
validation_result,
flags,
)
pattern_result = RecognizerResult(
entity_type=self.supported_entities[0],
start=start,
end=end,
score=score,
analysis_explanation=description,
recognition_metadata={
RecognizerResult.RECOGNIZER_NAME_KEY: self.name,
RecognizerResult.RECOGNIZER_IDENTIFIER_KEY: self.id,
},
)

if validation_result is not None:
if validation_result:
pattern_result.score = EntityRecognizer.MAX_SCORE
else:
pattern_result.score = EntityRecognizer.MIN_SCORE

if pattern_result.score > EntityRecognizer.MIN_SCORE:
results.append(pattern_result)
break
except TimeoutError:
logger.warning(
"Regex pattern '%s' timed out after %s seconds, skipping.",
pattern.name,
REGEX_TIMEOUT_SECONDS,
exc_info=True,
)

return results

Expand All @@ -216,6 +228,16 @@ def __is_valid_format(
country_regex = regex_per_country.get(country_code, "")
if bos_eos and country_regex:
country_regex = bos_eos[0] + country_regex + bos_eos[1]
return country_regex and re.match(country_regex, iban, flags=flags)
try:
return country_regex and re.match(
country_regex, iban, flags=flags, timeout=REGEX_TIMEOUT_SECONDS
)
except TimeoutError:
logger.warning(
"IBAN format validation regex timed out after %s seconds.",
REGEX_TIMEOUT_SECONDS,
exc_info=True,
)
return False

return False
39 changes: 31 additions & 8 deletions presidio-analyzer/tests/test_analyzer_engine.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
import copy
import re
from abc import ABC
from contextlib import nullcontext
from typing import List, Optional
import re
from unittest.mock import patch

import pytest

from presidio_analyzer import (
AnalyzerEngine,
PatternRecognizer,
EntityRecognizer,
Pattern,
PatternRecognizer,
RecognizerRegistry,
EntityRecognizer,
RecognizerResult,
)
from presidio_analyzer.nlp_engine import (
NlpArtifacts,
SpacyNlpEngine,
)
from presidio_analyzer.recognizer_registry import (
RecognizerRegistryProvider
)
from presidio_analyzer.recognizer_registry import RecognizerRegistryProvider

# noqa: F401
from tests import assert_result
from tests.mocks import NlpEngineMock, AppTracerMock, RecognizerRegistryMock
from tests.mocks import AppTracerMock, NlpEngineMock, RecognizerRegistryMock


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -935,3 +933,28 @@ def test_when_multiple_nameless_recognizers_context_is_correct(spacy_nlp_engine)

for recognizer_result in recognizer_results:
assert recognizer_result.score > 0.3


def test_when_regex_allow_list_times_out_then_result_is_kept(loaded_analyzer_engine):
"""Test that a timed-out allow list regex keeps the result (conservative behavior)."""
text = "bing.com is his favorite website"

with patch(
"presidio_analyzer.analyzer_engine.REGEX_TIMEOUT_SECONDS", 0.001
):
with patch(
"presidio_analyzer.analyzer_engine.re.compile"
) as mock_compile:
mock_compiled = mock_compile.return_value
mock_compiled.search.side_effect = TimeoutError("regex timed out")

results = loaded_analyzer_engine.analyze(
text=text,
language="en",
allow_list=["bing"],
allow_list_match="regex",
)

# Result should be kept on timeout (not filtered out)
assert any(r.entity_type == "URL" for r in results)

Loading
Loading