diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py index eb2e322bac..22e1cf83d8 100644 --- a/bbot/core/helpers/misc.py +++ b/bbot/core/helpers/misc.py @@ -2691,6 +2691,11 @@ def get_waf_strings(): return [ "The requested URL was rejected", "This content has been blocked", + "You don't have permission to access ", + "The URL you requested has been blocked", + "Request unsuccessful. Incapsula incident", + "Access Denied - Sucuri Website Firewall", + "Attention Required! | Cloudflare", ] diff --git a/bbot/defaults.yml b/bbot/defaults.yml index 3856a1f644..c04b3f2549 100644 --- a/bbot/defaults.yml +++ b/bbot/defaults.yml @@ -262,6 +262,24 @@ parameter_blacklist: - PHPSESSID - __cf_bm - f5_cspm + # CSRF / Anti-Forgery tokens + - authenticity_token + - csrfmiddlewaretoken + - __RequestVerificationToken + - antiforgerytoken + - __csrf_magic + - _wpnonce + # ASP.NET session/identity cookies + - .ASPXANONYMOUS + - .ASPXAUTH + # PKCE (Proof Key for Code Exchange) + - code_verifier + - code_challenge + # Akamai Bot Manager + - _abck + - bm_sz + - bm_sv + - ak_bmsc parameter_blacklist_prefixes: - TS01 diff --git a/bbot/modules/bypass403.py b/bbot/modules/bypass403.py index 65cae080b4..4a4b0ec473 100644 --- a/bbot/modules/bypass403.py +++ b/bbot/modules/bypass403.py @@ -1,5 +1,6 @@ from bbot.errors import HttpCompareError from bbot.modules.base import BaseModule +from bbot.core.helpers.misc import get_waf_strings """ Port of https://github.com/iamj0ker/bypass-403/ and https://portswigger.net/bappstore/444407b96d9c4de0adb7aed89e826122 @@ -80,6 +81,10 @@ class bypass403(BaseModule): meta = {"description": "Check 403 pages for common bypasses", "created_date": "2022-07-05", "author": "@liquidsec"} in_scope_only = True + async def setup(self): + self.waf_yara_rules = self.helpers.yara.compile_strings(get_waf_strings(), nocase=True) + return True + async def do_checks(self, compare_helper, event, collapse_threshold): results = set() error_count = 0 @@ -105,10 +110,10 @@ async def do_checks(self, compare_helper, event, collapse_threshold): # In some cases WAFs will respond with a 200 code which causes a false positive if subject_response is not None: - for waf_string in self.helpers.get_waf_strings(): - if waf_string in subject_response.text: - self.debug("Rejecting result based on presence of WAF string") - return + waf_matches = await self.helpers.yara.match(self.waf_yara_rules, subject_response.text) + if waf_matches: + self.debug("Rejecting result based on presence of WAF string") + return if match is False: if str(subject_response.status_code)[0] != "4": diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index 9db511c90b..364479c74f 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -1250,26 +1250,36 @@ async def handle_event(self, event, **kwargs): # Try to extract parameters from the redirect URL if self.parameter_extraction: - for ( - method, - parsed_url, - parameter_name, - original_value, - regex_name, - additional_params, - ) in extract_params_location(header_value, event.parsed_url): - if self.in_bl(parameter_name) is False: - await self.emit_web_parameter( - host=parsed_url.hostname, - param_type="GETPARAM", - name=parameter_name, - original_value=original_value, - url=self.url_unparse("GETPARAM", parsed_url), - description=f"HTTP Extracted Parameter [{parameter_name}] (Location Header)", - additional_params=additional_params, - event=event, - context=f"Excavate parsed a location header for parameters and found [GETPARAM] Parameter Name: [{parameter_name}] and emitted a WEB_PARAMETER for it", - ) + # Don't extract parameters from out-of-scope redirects — + # they would inherit in-scope status from the parent event + # and cause lightfuzz to fuzz external endpoints + redirect_parsed = urlparse(redirect_location) + redirect_host = redirect_parsed.hostname + if redirect_host and not self.scan.in_scope(redirect_host): + self.debug( + f"Skipping parameter extraction from out-of-scope redirect to {redirect_host}" + ) + else: + for ( + method, + parsed_url, + parameter_name, + original_value, + regex_name, + additional_params, + ) in extract_params_location(header_value, event.parsed_url): + if self.in_bl(parameter_name) is False: + await self.emit_web_parameter( + host=parsed_url.hostname, + param_type="GETPARAM", + name=parameter_name, + original_value=original_value, + url=self.url_unparse("GETPARAM", parsed_url), + description=f"HTTP Extracted Parameter [{parameter_name}] (Location Header)", + additional_params=additional_params, + event=event, + context=f"Excavate parsed a location header for parameters and found [GETPARAM] Parameter Name: [{parameter_name}] and emitted a WEB_PARAMETER for it", + ) else: self.warning("location header found but missing redirect_location in HTTP_RESPONSE") if header.lower() == "content-type": diff --git a/bbot/modules/lightfuzz/lightfuzz.py b/bbot/modules/lightfuzz/lightfuzz.py index 231081006f..a53e58c4be 100644 --- a/bbot/modules/lightfuzz/lightfuzz.py +++ b/bbot/modules/lightfuzz/lightfuzz.py @@ -2,6 +2,7 @@ from bbot.modules.base import BaseModule from bbot.errors import InteractshError +from bbot.core.helpers.misc import get_waf_strings class lightfuzz(BaseModule): @@ -60,6 +61,15 @@ async def setup(self): return False, f"Invalid Lightfuzz submodule ({submodule_name}) specified in enabled_modules" self.submodules[submodule_name] = submodule_class + waf_strings = get_waf_strings() + self.waf_yara_rules = self.helpers.yara.compile_strings(waf_strings, nocase=True) + # Serial submodule needs WAF + general error strings in one rule + from bbot.modules.lightfuzz.submodules.serial import serial + + self.serial_general_error_yara_rules = self.helpers.yara.compile_strings( + serial.GENERAL_ERROR_STRINGS + waf_strings, nocase=True + ) + interactsh_needed = any(submodule.uses_interactsh for submodule in self.submodules.values()) if interactsh_needed and not self.interactsh_disable: try: diff --git a/bbot/modules/lightfuzz/submodules/crypto.py b/bbot/modules/lightfuzz/submodules/crypto.py index b91069a736..56bb0a67f5 100644 --- a/bbot/modules/lightfuzz/submodules/crypto.py +++ b/bbot/modules/lightfuzz/submodules/crypto.py @@ -26,6 +26,12 @@ class crypto(BaseLightfuzz): * Padding Oracle Vulnerabilities: - Identifies the presence of cryptographic oracles that could be exploited to arbitrary decrypt or encrypt data for the parameter value. + * ECB Mode Detection: + - Passively detects ECB mode encryption by checking for repeated ciphertext blocks in parameter values (zero HTTP requests). + + * CBC Bit-Flipping Detection: + - Actively tests whether mutating different byte positions in the penultimate ciphertext block produces distinguishable server responses, + indicating CBC mode without integrity protection (2 HTTP requests). """ @@ -63,7 +69,6 @@ def is_base64(s): "key does not exist", "the parameter is incorrect", "cryptography exception", - "access denied", "unknown error", "invalid provider type", "no valid cert found", @@ -174,7 +179,7 @@ def modify_string(input_string, action="truncate", position=None, extension_leng if action == "truncate": modified_data = data[:-1] # Remove the last byte elif action == "mutate": - if not position: + if position is None: position = len(data) // 2 if position < 0 or position >= len(data): raise ValueError("Position out of range") @@ -184,7 +189,7 @@ def modify_string(input_string, action="truncate", position=None, extension_leng elif action == "extend": modified_data = data + (b"\x00" * extension_length) elif action == "flip": - if not position: + if position is None: position = len(data) // 2 if position < 0 or position >= len(data): raise ValueError("Position out of range") @@ -222,6 +227,93 @@ def possible_block_sizes(ciphertext_length): possible_sizes.append(block_size) return possible_sizes + def detect_ecb(self, probe_value): + """ + Passively detect ECB mode encryption by checking for repeated ciphertext blocks. + ECB encrypts each block independently, so identical plaintext blocks produce identical + ciphertext blocks. Zero HTTP requests required. + """ + data, encoding = self.format_agnostic_decode(probe_value) + if encoding == "unknown": + return + for block_size in self.possible_block_sizes(len(data)): + blocks = [data[i : i + block_size] for i in range(0, len(data), block_size)] + if len(blocks) != len(set(blocks)): + context = f"Lightfuzz Cryptographic Probe Submodule detected ECB mode encryption in parameter: [{self.event.data['name']}]" + self.results.append( + { + "severity": "MEDIUM", + "name": "ECB Mode Encryption Detected", + "confidence": "MEDIUM", + "description": f"ECB Mode Encryption Detected. Block size: [{block_size}] {self.metadata()}", + "context": context, + } + ) + return # Report first matching block size only + + async def cbc_bitflip(self, probe_value, cookies): + """ + Detect CBC bit-flipping vulnerability by mutating different byte positions in the + penultimate ciphertext block. In CBC mode, modifying byte N of block K affects byte N + of the decrypted block K+1. If the server produces distinguishable responses for + different mutation positions, it indicates CBC without integrity protection. + Cost: 2 HTTP requests. + """ + data, encoding = self.format_agnostic_decode(probe_value) + if encoding == "unknown": + return + # Stability pre-check: verify the endpoint returns consistent responses + if not await self._check_endpoint_stability(probe_value, encoding, cookies): + return + for block_size in self.possible_block_sizes(len(data)): + num_blocks = len(data) // block_size + if num_blocks < 2: + continue + penultimate_start = (num_blocks - 2) * block_size + # Mutate first byte of penultimate block + try: + probe_a = self.modify_string(probe_value, action="mutate", position=penultimate_start) + except ValueError: + continue + # Mutate middle byte of penultimate block + try: + probe_b = self.modify_string( + probe_value, action="mutate", position=penultimate_start + block_size // 2 + ) + except ValueError: + continue + # Use probe_a as the baseline, compare probe_b against it + http_compare = self.compare_baseline(self.event.data["type"], probe_a, cookies) + try: + result = await self.compare_probe(http_compare, self.event.data["type"], probe_b, cookies) + except HttpCompareError as e: + self.verbose(f"Encountered HttpCompareError during CBC bit-flip test: {e}") + continue + if result[0] is False and "body" in result[1]: + # Strip reflected probe values to avoid false positives + stripped_baseline = http_compare.baseline.text + stripped_probe = result[3].text + for encoded_a, encoded_b in [ + (probe_a, probe_b), + (probe_a.replace("+", " "), probe_b.replace("+", " ")), + (quote(probe_a), quote(probe_b)), + ]: + stripped_baseline = stripped_baseline.replace(encoded_a, "") + stripped_probe = stripped_probe.replace(encoded_b, "") + if stripped_baseline == stripped_probe: + continue + context = f"Lightfuzz Cryptographic Probe Submodule detected probable CBC bit-flipping in parameter: [{self.event.data['name']}]" + self.results.append( + { + "severity": "MEDIUM", + "name": "CBC Bit-Flipping Detected", + "confidence": "MEDIUM", + "description": f"CBC Bit-Flipping Detected. Block size: [{block_size}] {self.metadata()}", + "context": context, + } + ) + return # Report first matching block size only + async def padding_oracle_execute(self, original_data, encoding, block_size, cookies, possible_first_byte=True): """ Execute the padding oracle attack for a given block size. @@ -300,12 +392,42 @@ async def padding_oracle_execute(self, original_data, encoding, block_size, cook return None return False + async def _check_endpoint_stability(self, probe_value, encoding, cookies): + """Send the same probe value multiple times and verify the endpoint returns consistent responses. + Returns True if stable, False if responses vary for identical inputs (jitter).""" + data, _ = self.format_agnostic_decode(probe_value) + if encoding == "unknown": + return True + # Build a fixed probe to test stability + stability_value = self.format_agnostic_encode(b"\x00" * 16 + data[-16:], encoding) + stability_hashes = [] + for _ in range(3): + r = await self.standard_probe(self.event.data["type"], cookies, stability_value) + if r: + body = r.text + for encoded in [stability_value, stability_value.replace("+", " "), quote(stability_value)]: + body = body.replace(encoded, "") + stability_hashes.append(hash(body)) + if len(set(stability_hashes)) > 1: + self.debug( + f"Endpoint produces unstable responses for identical inputs " + f"({len(set(stability_hashes))}/{len(stability_hashes)} unique), " + f"skipping differential analysis" + ) + return False + return True + async def padding_oracle(self, probe_value, cookies): data, encoding = self.format_agnostic_decode(probe_value) possible_block_sizes = self.possible_block_sizes( len(data) ) # determine possible block sizes for the ciphertext + # Stability pre-check: verify the endpoint returns consistent responses + # for identical inputs before attempting differential analysis + if not await self._check_endpoint_stability(probe_value, encoding, cookies): + return + for block_size in possible_block_sizes: padding_oracle_result = await self.padding_oracle_execute(data, encoding, block_size, cookies) # if we get a negative result first, theres a 1/255 change it's a false negative. To rule that out, we must retry again with possible_first_byte set to false @@ -428,6 +550,10 @@ async def fuzz(self): self.debug("Parameter value does not appear to be cryptographic, aborting tests") return + # ECB Mode Detection (passive, zero HTTP requests) + if possible_block_cipher: + self.detect_ecb(probe_value) + # Cryptographic Response Divergence Test http_compare = self.compare_baseline(self.event.data["type"], probe_value, cookies) @@ -488,6 +614,9 @@ async def fuzz(self): ) await self.padding_oracle(probe_value, cookies) + # CBC Bit-Flipping Test + await self.cbc_bitflip(probe_value, cookies) + # Hash identification / Potential Length extension attack data, encoding = crypto.format_agnostic_decode(probe_value) # see if its possible that a given value is a hash, and if so, which one diff --git a/bbot/modules/lightfuzz/submodules/path.py b/bbot/modules/lightfuzz/submodules/path.py index eb866eac8f..b3bda83596 100644 --- a/bbot/modules/lightfuzz/submodules/path.py +++ b/bbot/modules/lightfuzz/submodules/path.py @@ -113,7 +113,9 @@ async def fuzz(self): and doubledot_probe[0] is False and doubledot_probe[3] is not None and doubledot_probe[1] != ["header"] - and "The requested URL was rejected" not in doubledot_probe[3].text + and not await self.lightfuzz.helpers.yara.match( + self.lightfuzz.waf_yara_rules, doubledot_probe[3].text + ) ): confirmations += 1 self.verbose(f"Got possible Path Traversal detection: [{str(confirmations)}] Confirmations") diff --git a/bbot/modules/lightfuzz/submodules/serial.py b/bbot/modules/lightfuzz/submodules/serial.py index 508fe98ab9..b3fd4bd915 100644 --- a/bbot/modules/lightfuzz/submodules/serial.py +++ b/bbot/modules/lightfuzz/submodules/serial.py @@ -47,12 +47,15 @@ class serial(BaseLightfuzz): "java.io.optionaldataexception", ] - GENERAL_ERRORS = [ + GENERAL_ERROR_STRINGS = [ "Internal Error", "Internal Server Error", - "The requested URL was rejected", ] + @property + def general_error_yara_rules(self): + return self.lightfuzz.serial_general_error_yara_rules + def is_possibly_serialized(self, value): # Use the is_base64 method from BaseLightfuzz via self if self.is_base64(value): @@ -101,7 +104,6 @@ async def fuzz(self): php_raw_serialization_payloads = self.PHP_RAW_SERIALIZATION_PAYLOADS serialization_errors = self.SERIALIZATION_ERRORS - general_errors = self.GENERAL_ERRORS probe_value = self.incoming_probe_value(populate_empty=False) if probe_value: @@ -161,12 +163,24 @@ async def fuzz(self): # if the status code changed to 200, and the response doesn't match our general error exclusions, we have a finding self.debug(f"Potential finding detected for {payload_type}, needs confirmation") + + baseline_status = payload_baseline.baseline.status_code + # Skip Error Resolution if baseline uses a non-standard HTTP status code (>511). + # Non-standard codes (e.g. 512 from GlobalProtect) are application-specific + # and don't reliably indicate an error state that deserialization could "resolve". + if baseline_status > 511: + self.debug( + f"Baseline status {baseline_status} is non-standard (>511), skipping Error Resolution for {payload_type}" + ) + continue + + general_error_matches = await self.lightfuzz.helpers.yara.match( + self.general_error_yara_rules, response.text + ) if ( status_code == 200 and "code" in diff_reasons - and not any( - error in response.text for error in general_errors - ) # ensure the 200 is not actually an error + and not general_error_matches # ensure the 200 is not actually an error ): # Confirm the baseline error state is stable by re-sending the control payload. # If the control also returns 200 now, the original error was transient. diff --git a/bbot/modules/lightfuzz/submodules/sqli.py b/bbot/modules/lightfuzz/submodules/sqli.py index 4d2633f34f..04a0d803d7 100644 --- a/bbot/modules/lightfuzz/submodules/sqli.py +++ b/bbot/modules/lightfuzz/submodules/sqli.py @@ -119,14 +119,27 @@ async def fuzz(self): if "code" in single_quote[1] and ( single_quote[3].status_code != double_single_quote[3].status_code ): - self.results.append( - { - "name": "Possible SQL Injection", - "severity": "HIGH", - "confidence": "MEDIUM", - "description": f"Possible SQL Injection. {self.metadata()} Detection Method: [Single Quote/Two Single Quote, Code Change ({http_compare.baseline.status_code}->{single_quote[3].status_code}->{double_single_quote[3].status_code})]", - } - ) + # Check if the status code change is due to a WAF, not SQL injection + is_waf = False + if single_quote[3].status_code == 403: + waf_matches = await self.lightfuzz.helpers.yara.match( + self.lightfuzz.waf_yara_rules, single_quote[3].text + ) + if waf_matches: + self.debug( + "Single quote probe returned 403 with WAF signature, " + "suppressing SQL injection finding" + ) + is_waf = True + if not is_waf: + self.results.append( + { + "name": "Possible SQL Injection", + "severity": "HIGH", + "confidence": "MEDIUM", + "description": f"Possible SQL Injection. {self.metadata()} Detection Method: [Single Quote/Two Single Quote, Code Change ({http_compare.baseline.status_code}->{single_quote[3].status_code}->{double_single_quote[3].status_code})]", + } + ) else: self.debug("Failed to get responses for both single_quote and double_single_quote") except HttpCompareError as e: diff --git a/bbot/modules/lightfuzz/submodules/xss.py b/bbot/modules/lightfuzz/submodules/xss.py index 827afc33e5..c61e6f413e 100644 --- a/bbot/modules/lightfuzz/submodules/xss.py +++ b/bbot/modules/lightfuzz/submodules/xss.py @@ -84,21 +84,62 @@ def is_balanced(section, target_index, quote_char): # If we have no matches, the target string is most likely not within quotes return "outside" + def _verify_match_context(self, html, match, context): + """Verify the match appears in the correct HTML context, not just anywhere in the response. + When the same parameter is reflected in multiple contexts with different encoding, + a match found in the wrong context can cause false positives.""" + if "Tag Attribute" in context: + # Verify match is inside a tag (between < and >), not in text content + pos = html.find(match) + while pos != -1: + preceding = html[:pos] + last_open = preceding.rfind("<") + last_close = preceding.rfind(">") + if last_open > last_close: + return True + pos = html.find(match, pos + 1) + return False + elif "Between Tags" in context: + pos = html.find(match) + while pos != -1: + preceding = html[:pos] + last_open = preceding.rfind("<") + last_close = preceding.rfind(">") + if last_close > last_open: + return True + pos = html.find(match, pos + 1) + return False + elif "In Javascript" in context: + in_js_regex = re.compile( + rf"]*>[^<]*(?:<(?!\/script>)[^<]*)*{re.escape(match)}" + rf"[^<]*(?:<(?!\/script>)[^<]*)*<\/script>" + ) + return bool(in_js_regex.search(html)) + return True + async def check_probe(self, cookies, probe, match, context): # Send the defined probe and look for the expected match value in the response probe_result = await self.standard_probe(self.event.data["type"], cookies, probe) - if probe_result and match in probe_result.text: - self.results.append( - { - "name": "Possible Reflected XSS", - "severity": "MEDIUM", - "confidence": "MEDIUM", - "type": "FINDING", - "description": f"Possible Reflected XSS. Parameter: [{self.event.data['name']}] Context: [{context}] Parameter Type: [{self.event.data['type']}]{self.conversion_note()}", - } + if not probe_result or match not in probe_result.text: + return False + + if not self._verify_match_context(probe_result.text, match, context): + self.debug( + f"Probe match found in response but not in the expected context [{context}]. " + f"Likely reflected in a different context with different encoding. Suppressing." ) - return True - return False + return False + + self.results.append( + { + "name": "Possible Reflected XSS", + "severity": "MEDIUM", + "confidence": "MEDIUM", + "type": "FINDING", + "description": f"Possible Reflected XSS. Parameter: [{self.event.data['name']}] Context: [{context}] Parameter Type: [{self.event.data['type']}]{self.conversion_note()}", + } + ) + return True async def fuzz(self): lightfuzz_event = self.event.parent diff --git a/bbot/test/test_step_2/module_tests/test_module_excavate.py b/bbot/test/test_step_2/module_tests/test_module_excavate.py index 6d56911723..b325553590 100644 --- a/bbot/test/test_step_2/module_tests/test_module_excavate.py +++ b/bbot/test/test_step_2/module_tests/test_module_excavate.py @@ -1570,3 +1570,37 @@ def check(self, module_test, events): e for e in events if e.type == "FINDING" and "ftp://ftp.test.notreal" in e.data.get("description", "") ] assert len(ftp_findings) == 0, f"PDF body should not produce findings, but got: {ftp_findings}" + + +class TestExcavateRedirectParameterScope(ModuleTestBase): + """Verify that parameter extraction is skipped for out-of-scope redirect targets. + + When an in-scope HTTP response has a Location header pointing to an external + out-of-scope domain, the redirect URL's query parameters should NOT be emitted + as WEB_PARAMETER events, because they would inherit the in-scope parent's scope + distance and cause lightfuzz to fuzz external endpoints. + """ + + targets = ["http://127.0.0.1:8888/"] + modules_overrides = ["http", "excavate", "hunt"] + + async def setup_before_prep(self, module_test): + module_test.httpserver.expect_request("/").respond_with_data( + "", + status=302, + headers={"Location": "https://login.microsoftonline.com/oauth2/authorize?state=abc123&client_id=test456"}, + ) + + def check(self, module_test, events): + # The redirect URL itself should be emitted as URL_UNVERIFIED (that's correct behavior) + assert any(e.type == "URL_UNVERIFIED" and "login.microsoftonline.com" in e.url for e in events), ( + "Redirect URL_UNVERIFIED should still be emitted" + ) + + # But NO WEB_PARAMETER events should be emitted for the out-of-scope redirect's parameters + redirect_params = [ + e for e in events if e.type == "WEB_PARAMETER" and "login.microsoftonline.com" in e.data.get("url", "") + ] + assert len(redirect_params) == 0, ( + f"Out-of-scope redirect parameters should not be extracted, but got: {redirect_params}" + ) diff --git a/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py b/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py index 9e0fa18d37..4de37b9fe6 100644 --- a/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py +++ b/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py @@ -1385,6 +1385,38 @@ def check(self, module_test, events): ) +class Test_Lightfuzz_serial_errorresolution_nonstandard_status(Test_Lightfuzz_serial_errorresolution): + """A baseline with a non-standard status code (>511, e.g. GlobalProtect's 512) + should not produce an Error Resolution finding even if the probe returns 200.""" + + def request_handler(self, request): + dotnet_serial_error_resolved = ( + "Deserialization successful! Object type: System.String" + ) + post_params = request.form + + if "TextBox1" not in post_params.keys(): + return Response(self.dotnet_serial_html, status=200) + + else: + if post_params["__VIEWSTATE"] != "/wEPDwULLTE5MTI4MzkxNjVkZNt7ICM+GixNryV6ucx+srzhXlwP": + # Non-standard status code (like GlobalProtect's 512) + return Response(self.dotnet_serial_error, status=512) + if post_params["TextBox1"] == "AAEAAAD/////AQAAAAAAAAAGAQAAAAdndXN0YXZvCw==": + return Response(dotnet_serial_error_resolved, status=200) + else: + return Response(self.dotnet_serial_error, status=512) + + def check(self, module_test, events): + no_finding_emitted = True + for e in events: + if e.type == "FINDING" and "Error Resolution" in e.data.get("description", ""): + no_finding_emitted = False + assert no_finding_emitted, ( + "False positive Error Resolution finding was emitted for non-standard baseline status code (>511)" + ) + + # CMDi echo canary class Test_Lightfuzz_cmdi(ModuleTestBase): targets = ["http://127.0.0.1:8888"] @@ -1687,7 +1719,7 @@ def request_handler(self, request): """ crypto_block = """
-

Access Denied!

+

Padding is invalid


""" @@ -2338,6 +2370,284 @@ class Test_Lightfuzz_envelope_isolation_paddingoracle_reflecting(Test_Lightfuzz_ } +# ECB Mode Detection: ciphertext with repeated 16-byte blocks (A+B+A+B pattern) +class Test_Lightfuzz_ECBDetection(ModuleTestBase): + targets = ["http://127.0.0.1:8888/"] + modules_overrides = ["http", "excavate", "lightfuzz"] + config_overrides = { + "interactsh_disable": True, + "modules": { + "lightfuzz": {"enabled_submodules": ["crypto"]}, + }, + } + + # Two 16-byte blocks with non-overlapping byte values (high entropy), repeated A+B+A+B + _block_a = bytes(range(16)).hex() # 000102...0f + _block_b = bytes(range(128, 144)).hex() # 808182...8f + ecb_ciphertext_hex = _block_a + _block_b + _block_a + _block_b + + def request_handler(self, request): + qs = str(request.query_string.decode()) + parameter_block = f""" +
+
+ + +
+
+ """ + if "token=" in qs: + return Response("OK", status=200) + return Response(parameter_block, status=200) + + async def setup_after_prep(self, module_test): + module_test.scan.modules["lightfuzz"].helpers.rand_string = lambda *args, **kwargs: "AAAAAAAAAAAAAA" + expect_args = re.compile("/") + module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) + + def check(self, module_test, events): + ecb_detected = False + for e in events: + if e.type == "FINDING": + if "ECB Mode Encryption Detected" in e.data["description"]: + ecb_detected = True + assert ecb_detected, "ECB Mode Encryption FINDING not emitted" + + +# ECB Negative: all unique blocks, should NOT detect ECB +class Test_Lightfuzz_ECBDetection_Negative(ModuleTestBase): + targets = ["http://127.0.0.1:8888/"] + modules_overrides = ["http", "excavate", "lightfuzz"] + config_overrides = { + "interactsh_disable": True, + "modules": { + "lightfuzz": {"enabled_submodules": ["crypto"]}, + }, + } + + # 4 unique 16-byte blocks with non-overlapping byte values (high entropy, no repeats) + unique_ciphertext_hex = ( + bytes(range(0, 16)).hex() + + bytes(range(64, 80)).hex() + + bytes(range(128, 144)).hex() + + bytes(range(192, 208)).hex() + ) + + def request_handler(self, request): + qs = str(request.query_string.decode()) + parameter_block = f""" +
+
+ + +
+
+ """ + if "token=" in qs: + return Response("OK", status=200) + return Response(parameter_block, status=200) + + async def setup_after_prep(self, module_test): + module_test.scan.modules["lightfuzz"].helpers.rand_string = lambda *args, **kwargs: "AAAAAAAAAAAAAA" + expect_args = re.compile("/") + module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) + + def check(self, module_test, events): + for e in events: + if e.type == "FINDING": + assert "ECB Mode Encryption Detected" not in e.data["description"], ( + "ECB falsely detected on unique blocks" + ) + + +# CBC Bit-Flipping Detection: server returns different responses for different byte-position mutations +class Test_Lightfuzz_CBCBitflipDetection(ModuleTestBase): + targets = ["http://127.0.0.1:8888"] + modules_overrides = ["http", "excavate", "lightfuzz"] + config_overrides = { + "interactsh_disable": True, + "modules": { + "lightfuzz": {"enabled_submodules": ["crypto"]}, + }, + } + + # 3 blocks of 16 bytes = 48 bytes, base64-encoded + original_b64 = base64.b64encode(bytes(range(48))).decode() + + def request_handler(self, request): + encrypted_value = quote(self.original_b64) + default_html = f""" + +
+ + +
+ + """ + if "/process" in request.url and request.method == "POST": + if request.form and request.form.get("cipher"): + cipher_val = request.form["cipher"] + try: + raw = base64.b64decode(cipher_val) + except Exception: + return Response("Invalid input", status=200) + # Penultimate block starts at byte 16 + # Check which byte in the penultimate block differs from original + original_raw = bytes(range(48)) + if len(raw) == len(original_raw): + penultimate_start = 16 + for i in range(penultimate_start, penultimate_start + 16): + if raw[i] != original_raw[i]: + # First byte (pos 16) vs middle byte (pos 24) produce different responses + if i < penultimate_start + 8: + return Response("Decryption result: type A", status=200) + else: + return Response("Decryption result: type B", status=200) + return Response("Decryption failed", status=200) + return Response("Decryption failed", status=200) + return Response(default_html, status=200) + + async def setup_after_prep(self, module_test): + module_test.set_expect_requests_handler(expect_args=re.compile(".*"), request_handler=self.request_handler) + + def check(self, module_test, events): + cbc_bitflip_detected = False + for e in events: + if e.type == "FINDING": + if "CBC Bit-Flipping Detected" in e.data["description"]: + cbc_bitflip_detected = True + assert cbc_bitflip_detected, "CBC Bit-Flipping Detected FINDING not emitted" + + +# CBC Bit-Flipping Negative: server returns identical response regardless of mutation position +class Test_Lightfuzz_CBCBitflipDetection_Negative(ModuleTestBase): + targets = ["http://127.0.0.1:8888"] + modules_overrides = ["http", "excavate", "lightfuzz"] + config_overrides = { + "interactsh_disable": True, + "modules": { + "lightfuzz": {"enabled_submodules": ["crypto"]}, + }, + } + + original_b64 = base64.b64encode(bytes(range(48))).decode() + + def request_handler(self, request): + encrypted_value = quote(self.original_b64) + default_html = f""" + +
+ + +
+ + """ + if "/process" in request.url and request.method == "POST": + return Response("Decryption failed", status=200) + return Response(default_html, status=200) + + async def setup_after_prep(self, module_test): + module_test.set_expect_requests_handler(expect_args=re.compile(".*"), request_handler=self.request_handler) + + def check(self, module_test, events): + for e in events: + if e.type == "FINDING": + assert "CBC Bit-Flipping Detected" not in e.data["description"], ( + "CBC Bit-Flipping falsely detected on identical responses" + ) + + +# CBC Bit-Flipping without Padding Oracle: server never fails decryption (OPENSSL_ZERO_PADDING equivalent). +# Every input produces a unique response derived from the submitted value — no padding validity leaked. +# Padding oracle sends ~254 probes that all get unique responses → differ_count >> block_size → not detected. +# CBC bit-flip probes still produce different responses → detected. +class Test_Lightfuzz_CBCBitflipDetection_NoPaddingOracle(ModuleTestBase): + targets = ["http://127.0.0.1:8888"] + modules_overrides = ["http", "excavate", "lightfuzz"] + config_overrides = { + "interactsh_disable": True, + "modules": { + "lightfuzz": {"enabled_submodules": ["crypto"]}, + }, + } + + # 3 blocks of 16 bytes = 48 bytes, base64-encoded + original_b64 = base64.b64encode(bytes(range(48))).decode() + + def request_handler(self, request): + encrypted_value = quote(self.original_b64) + default_html = f""" + +
+ + +
+ + """ + if "/process" in request.url and request.method == "POST": + if request.form and request.form.get("cipher"): + cipher_val = request.form["cipher"] + # No error paths — always return content derived from input. + # Simulates OPENSSL_ZERO_PADDING: decryption never fails. + import hashlib + + digest = hashlib.md5(cipher_val.encode()).hexdigest() + return Response(f"Session loaded: {digest}", status=200) + return Response("Session loaded: default", status=200) + return Response(default_html, status=200) + + async def setup_after_prep(self, module_test): + module_test.set_expect_requests_handler(expect_args=re.compile(".*"), request_handler=self.request_handler) + + def check(self, module_test, events): + cbc_bitflip_detected = False + padding_oracle_detected = False + for e in events: + if e.type == "FINDING": + if "CBC Bit-Flipping Detected" in e.data["description"]: + cbc_bitflip_detected = True + if "Padding Oracle Vulnerability" in e.data["description"]: + padding_oracle_detected = True + assert cbc_bitflip_detected, "CBC Bit-Flipping should be detected even without padding oracle" + assert not padding_oracle_detected, "Padding Oracle should NOT be detected when decryption never fails" + + +# Envelope state isolation: ECB detection with all submodules enabled +class Test_Lightfuzz_envelope_isolation_ecb(Test_Lightfuzz_ECBDetection): + config_overrides = { + "interactsh_disable": True, + "modules": { + "lightfuzz": { + "enabled_submodules": ["sqli", "cmdi", "xss", "path", "ssti", "crypto", "serial", "esi"], + } + }, + } + + +# Envelope state isolation: CBC bit-flip detection with all submodules enabled +class Test_Lightfuzz_envelope_isolation_cbc_bitflip(Test_Lightfuzz_CBCBitflipDetection): + config_overrides = { + "interactsh_disable": True, + "modules": { + "lightfuzz": { + "enabled_submodules": ["sqli", "cmdi", "xss", "path", "ssti", "crypto", "serial", "esi"], + } + }, + } + + +# Envelope state isolation: CBC bit-flip without padding oracle, all submodules enabled +class Test_Lightfuzz_envelope_isolation_cbc_bitflip_no_po(Test_Lightfuzz_CBCBitflipDetection_NoPaddingOracle): + config_overrides = { + "interactsh_disable": True, + "modules": { + "lightfuzz": { + "enabled_submodules": ["sqli", "cmdi", "xss", "path", "ssti", "crypto", "serial", "esi"], + } + }, + } + + # Test filter_event method with WAF tags class Test_Lightfuzz_filter_event(ModuleTestBase): targets = ["http://127.0.0.1:8888"] @@ -2575,3 +2885,164 @@ def check(self, module_test, events): assert sqli_postparam_converted_finding_emitted, ( "SQLi POSTPARAM (converted from GETPARAM) FINDING not emitted (try_get_as_post failed)" ) + + +# Padding Oracle Jitter Stability Pre-Check +class Test_Lightfuzz_PaddingOracleDetection_JitterStability(Test_Lightfuzz_PaddingOracleDetection): + """Padding oracle negative test: the endpoint produces different response bodies for identical inputs + (e.g. ADFS with embedded timestamps/nonces). The stability pre-check should detect this and skip.""" + + jitter_counter = 0 + + def request_handler(self, request): + encrypted_value = quote( + "dplyorsu8VUriMW/8DqVDU6kRwL/FDk3Q+4GXVGZbo0CTh9YX1YvzZZJrYe4cHxvAICyliYtp1im4fWoOa54Zg==" + ) + default_html_response = f""" + + +
+ + +
+ + + """ + + if "/decrypt" in request.url and request.method == "POST": + # Every response is unique, simulating ADFS-style dynamic content + Test_Lightfuzz_PaddingOracleDetection_JitterStability.jitter_counter += 1 + response_content = f"Error correlation_id={Test_Lightfuzz_PaddingOracleDetection_JitterStability.jitter_counter} nonce=abc{Test_Lightfuzz_PaddingOracleDetection_JitterStability.jitter_counter}" + return Response(response_content, status=200) + else: + return Response(default_html_response, status=200) + + def check(self, module_test, events): + web_parameter_extracted = False + padding_oracle_detected = False + for e in events: + if e.type == "WEB_PARAMETER": + if "HTTP Extracted Parameter [encrypted_data] (POST Form" in e.data["description"]: + web_parameter_extracted = True + if e.type == "FINDING": + if "Padding Oracle" in e.data["description"]: + padding_oracle_detected = True + + assert web_parameter_extracted, "Web parameter was not extracted" + assert not padding_oracle_detected, ( + "Padding oracle should NOT be detected when endpoint has jittery responses (stability pre-check should abort)" + ) + + +# XSS Multi-Context Reflection False Positive +class Test_Lightfuzz_xss_multicontext(Test_Lightfuzz_xss): + """XSS negative test: parameter reflected in multiple contexts with different encoding. + Quote survives in text content but is encoded in tag attribute. Should NOT report Tag Attribute XSS.""" + + def request_handler(self, request): + qs = str(request.query_string.decode()) + + parameter_block = """ + +
+ + +
+ + """ + if "path=" in qs: + value = qs.split("path=")[1] + if "&" in value: + value = value.split("&")[0] + decoded = unquote(value) + # Tag attribute context: quotes are URL-encoded (safe) + attr_value = decoded.replace('"', "%22") + # Text content: raw reflection (quotes survive but harmless here) + text_value = decoded + # JS context: everything URL-encoded (safe) + js_value = value + + html = f""" + +
+ +
+

{text_value}

+ + + """ + return Response(html, status=200) + return Response(parameter_block, status=200) + + async def setup_after_prep(self, module_test): + module_test.scan.modules["lightfuzz"].helpers.rand_string = lambda *args, **kwargs: "AAAAAAAAAAAAAA" + expect_args = re.compile("/") + module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) + + def check(self, module_test, events): + web_parameter_emitted = False + tag_attribute_xss_emitted = False + for e in events: + if e.type == "WEB_PARAMETER": + if "HTTP Extracted Parameter [path]" in e.data["description"]: + web_parameter_emitted = True + if e.type == "FINDING": + if "Possible Reflected XSS" in e.data["description"] and "Tag Attribute" in e.data["description"]: + tag_attribute_xss_emitted = True + + assert web_parameter_emitted, "WEB_PARAMETER was not emitted" + assert not tag_attribute_xss_emitted, ( + "Tag Attribute XSS should NOT be reported when the quote only survives in text content, not in tag attributes" + ) + + +# SQLi WAF False Positive (Akamai-style 403) +class Test_Lightfuzz_sqli_waf(Test_Lightfuzz_sqli): + """SQLi negative test: endpoint returns 403 with WAF signature when single quote is sent. + Should NOT report SQL injection.""" + + def request_handler(self, request): + qs = str(request.query_string.decode()) + parameter_block = """ + + """ + if "search=" in qs: + value = qs.split("=")[1] + if "&" in value: + value = value.split("&")[0] + + if value.endswith("'") and not value.endswith("''"): + # WAF blocks the request with a known WAF string + waf_response = """ + + Access Denied + +

Access Denied

+

The requested URL was rejected. Please consult with your administrator.

+ + + """ + return Response(waf_response, status=403) + return Response(parameter_block, status=200) + return Response(parameter_block, status=200) + + def check(self, module_test, events): + web_parameter_emitted = False + sqli_finding_emitted = False + for e in events: + if e.type == "WEB_PARAMETER": + if "HTTP Extracted Parameter [search]" in e.data["description"]: + web_parameter_emitted = True + if e.type == "FINDING": + if "Possible SQL Injection" in e.data["description"] and "Code Change" in e.data["description"]: + sqli_finding_emitted = True + + assert web_parameter_emitted, "WEB_PARAMETER was not emitted" + assert not sqli_finding_emitted, ( + "SQLi should NOT be reported when single quote probe triggers a WAF 403 response" + )