Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bbot/core/helpers/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2691,6 +2691,11 @@ def get_waf_strings():
return [
"The requested URL was rejected",
"This content has been blocked",
"You don't have permission to access ",
"The URL you requested has been blocked",
"Request unsuccessful. Incapsula incident",
"Access Denied - Sucuri Website Firewall",
"Attention Required! | Cloudflare",
]


Expand Down
18 changes: 18 additions & 0 deletions bbot/defaults.yml
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,24 @@ parameter_blacklist:
- PHPSESSID
- __cf_bm
- f5_cspm
# CSRF / Anti-Forgery tokens
- authenticity_token
- csrfmiddlewaretoken
- __RequestVerificationToken
- antiforgerytoken
- __csrf_magic
- _wpnonce
# ASP.NET session/identity cookies
- .ASPXANONYMOUS
- .ASPXAUTH
# PKCE (Proof Key for Code Exchange)
- code_verifier
- code_challenge
# Akamai Bot Manager
- _abck
- bm_sz
- bm_sv
- ak_bmsc

parameter_blacklist_prefixes:
- TS01
Expand Down
13 changes: 9 additions & 4 deletions bbot/modules/bypass403.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from bbot.errors import HttpCompareError
from bbot.modules.base import BaseModule
from bbot.core.helpers.misc import get_waf_strings

"""
Port of https://github.com/iamj0ker/bypass-403/ and https://portswigger.net/bappstore/444407b96d9c4de0adb7aed89e826122
Expand Down Expand Up @@ -80,6 +81,10 @@ class bypass403(BaseModule):
meta = {"description": "Check 403 pages for common bypasses", "created_date": "2022-07-05", "author": "@liquidsec"}
in_scope_only = True

async def setup(self):
self.waf_yara_rules = self.helpers.yara.compile_strings(get_waf_strings(), nocase=True)
return True

async def do_checks(self, compare_helper, event, collapse_threshold):
results = set()
error_count = 0
Expand All @@ -105,10 +110,10 @@ async def do_checks(self, compare_helper, event, collapse_threshold):

# In some cases WAFs will respond with a 200 code which causes a false positive
if subject_response is not None:
for waf_string in self.helpers.get_waf_strings():
if waf_string in subject_response.text:
self.debug("Rejecting result based on presence of WAF string")
return
waf_matches = await self.helpers.yara.match(self.waf_yara_rules, subject_response.text)
if waf_matches:
self.debug("Rejecting result based on presence of WAF string")
return

if match is False:
if str(subject_response.status_code)[0] != "4":
Expand Down
50 changes: 30 additions & 20 deletions bbot/modules/internal/excavate.py
Original file line number Diff line number Diff line change
Expand Up @@ -1250,26 +1250,36 @@ async def handle_event(self, event, **kwargs):

# Try to extract parameters from the redirect URL
if self.parameter_extraction:
for (
method,
parsed_url,
parameter_name,
original_value,
regex_name,
additional_params,
) in extract_params_location(header_value, event.parsed_url):
if self.in_bl(parameter_name) is False:
await self.emit_web_parameter(
host=parsed_url.hostname,
param_type="GETPARAM",
name=parameter_name,
original_value=original_value,
url=self.url_unparse("GETPARAM", parsed_url),
description=f"HTTP Extracted Parameter [{parameter_name}] (Location Header)",
additional_params=additional_params,
event=event,
context=f"Excavate parsed a location header for parameters and found [GETPARAM] Parameter Name: [{parameter_name}] and emitted a WEB_PARAMETER for it",
)
# Don't extract parameters from out-of-scope redirects —
# they would inherit in-scope status from the parent event
# and cause lightfuzz to fuzz external endpoints
redirect_parsed = urlparse(redirect_location)
redirect_host = redirect_parsed.hostname
if redirect_host and not self.scan.in_scope(redirect_host):
self.debug(
f"Skipping parameter extraction from out-of-scope redirect to {redirect_host}"
)
else:
for (
method,
parsed_url,
parameter_name,
original_value,
regex_name,
additional_params,
) in extract_params_location(header_value, event.parsed_url):
if self.in_bl(parameter_name) is False:
await self.emit_web_parameter(
host=parsed_url.hostname,
param_type="GETPARAM",
name=parameter_name,
original_value=original_value,
url=self.url_unparse("GETPARAM", parsed_url),
description=f"HTTP Extracted Parameter [{parameter_name}] (Location Header)",
additional_params=additional_params,
event=event,
context=f"Excavate parsed a location header for parameters and found [GETPARAM] Parameter Name: [{parameter_name}] and emitted a WEB_PARAMETER for it",
)
else:
self.warning("location header found but missing redirect_location in HTTP_RESPONSE")
if header.lower() == "content-type":
Expand Down
10 changes: 10 additions & 0 deletions bbot/modules/lightfuzz/lightfuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from bbot.modules.base import BaseModule

from bbot.errors import InteractshError
from bbot.core.helpers.misc import get_waf_strings


class lightfuzz(BaseModule):
Expand Down Expand Up @@ -60,6 +61,15 @@ async def setup(self):
return False, f"Invalid Lightfuzz submodule ({submodule_name}) specified in enabled_modules"
self.submodules[submodule_name] = submodule_class

waf_strings = get_waf_strings()
self.waf_yara_rules = self.helpers.yara.compile_strings(waf_strings, nocase=True)
# Serial submodule needs WAF + general error strings in one rule
from bbot.modules.lightfuzz.submodules.serial import serial

self.serial_general_error_yara_rules = self.helpers.yara.compile_strings(
serial.GENERAL_ERROR_STRINGS + waf_strings, nocase=True
)

interactsh_needed = any(submodule.uses_interactsh for submodule in self.submodules.values())
if interactsh_needed and not self.interactsh_disable:
try:
Expand Down
135 changes: 132 additions & 3 deletions bbot/modules/lightfuzz/submodules/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class crypto(BaseLightfuzz):
* Padding Oracle Vulnerabilities:
- Identifies the presence of cryptographic oracles that could be exploited to arbitrary decrypt or encrypt data for the parameter value.

* ECB Mode Detection:
- Passively detects ECB mode encryption by checking for repeated ciphertext blocks in parameter values (zero HTTP requests).

* CBC Bit-Flipping Detection:
- Actively tests whether mutating different byte positions in the penultimate ciphertext block produces distinguishable server responses,
indicating CBC mode without integrity protection (2 HTTP requests).

"""

Expand Down Expand Up @@ -63,7 +69,6 @@ def is_base64(s):
"key does not exist",
"the parameter is incorrect",
"cryptography exception",
"access denied",
"unknown error",
"invalid provider type",
"no valid cert found",
Expand Down Expand Up @@ -174,7 +179,7 @@ def modify_string(input_string, action="truncate", position=None, extension_leng
if action == "truncate":
modified_data = data[:-1] # Remove the last byte
elif action == "mutate":
if not position:
if position is None:
position = len(data) // 2
if position < 0 or position >= len(data):
raise ValueError("Position out of range")
Expand All @@ -184,7 +189,7 @@ def modify_string(input_string, action="truncate", position=None, extension_leng
elif action == "extend":
modified_data = data + (b"\x00" * extension_length)
elif action == "flip":
if not position:
if position is None:
position = len(data) // 2
if position < 0 or position >= len(data):
raise ValueError("Position out of range")
Expand Down Expand Up @@ -222,6 +227,93 @@ def possible_block_sizes(ciphertext_length):
possible_sizes.append(block_size)
return possible_sizes

def detect_ecb(self, probe_value):
"""
Passively detect ECB mode encryption by checking for repeated ciphertext blocks.
ECB encrypts each block independently, so identical plaintext blocks produce identical
ciphertext blocks. Zero HTTP requests required.
"""
data, encoding = self.format_agnostic_decode(probe_value)
if encoding == "unknown":
return
for block_size in self.possible_block_sizes(len(data)):
blocks = [data[i : i + block_size] for i in range(0, len(data), block_size)]
if len(blocks) != len(set(blocks)):
context = f"Lightfuzz Cryptographic Probe Submodule detected ECB mode encryption in parameter: [{self.event.data['name']}]"
self.results.append(
{
"severity": "MEDIUM",
"name": "ECB Mode Encryption Detected",
"confidence": "MEDIUM",
"description": f"ECB Mode Encryption Detected. Block size: [{block_size}] {self.metadata()}",
"context": context,
}
)
return # Report first matching block size only

async def cbc_bitflip(self, probe_value, cookies):
"""
Detect CBC bit-flipping vulnerability by mutating different byte positions in the
penultimate ciphertext block. In CBC mode, modifying byte N of block K affects byte N
of the decrypted block K+1. If the server produces distinguishable responses for
different mutation positions, it indicates CBC without integrity protection.
Cost: 2 HTTP requests.
"""
data, encoding = self.format_agnostic_decode(probe_value)
if encoding == "unknown":
return
# Stability pre-check: verify the endpoint returns consistent responses
if not await self._check_endpoint_stability(probe_value, encoding, cookies):
return
for block_size in self.possible_block_sizes(len(data)):
num_blocks = len(data) // block_size
if num_blocks < 2:
continue
penultimate_start = (num_blocks - 2) * block_size
# Mutate first byte of penultimate block
try:
probe_a = self.modify_string(probe_value, action="mutate", position=penultimate_start)
except ValueError:
continue
# Mutate middle byte of penultimate block
try:
probe_b = self.modify_string(
probe_value, action="mutate", position=penultimate_start + block_size // 2
)
except ValueError:
continue
# Use probe_a as the baseline, compare probe_b against it
http_compare = self.compare_baseline(self.event.data["type"], probe_a, cookies)
try:
result = await self.compare_probe(http_compare, self.event.data["type"], probe_b, cookies)
except HttpCompareError as e:
self.verbose(f"Encountered HttpCompareError during CBC bit-flip test: {e}")
continue
if result[0] is False and "body" in result[1]:
# Strip reflected probe values to avoid false positives
stripped_baseline = http_compare.baseline.text
stripped_probe = result[3].text
for encoded_a, encoded_b in [
(probe_a, probe_b),
(probe_a.replace("+", " "), probe_b.replace("+", " ")),
(quote(probe_a), quote(probe_b)),
]:
stripped_baseline = stripped_baseline.replace(encoded_a, "")
stripped_probe = stripped_probe.replace(encoded_b, "")
if stripped_baseline == stripped_probe:
continue
context = f"Lightfuzz Cryptographic Probe Submodule detected probable CBC bit-flipping in parameter: [{self.event.data['name']}]"
self.results.append(
{
"severity": "MEDIUM",
"name": "CBC Bit-Flipping Detected",
"confidence": "MEDIUM",
"description": f"CBC Bit-Flipping Detected. Block size: [{block_size}] {self.metadata()}",
"context": context,
}
)
return # Report first matching block size only

async def padding_oracle_execute(self, original_data, encoding, block_size, cookies, possible_first_byte=True):
"""
Execute the padding oracle attack for a given block size.
Expand Down Expand Up @@ -300,12 +392,42 @@ async def padding_oracle_execute(self, original_data, encoding, block_size, cook
return None
return False

async def _check_endpoint_stability(self, probe_value, encoding, cookies):
"""Send the same probe value multiple times and verify the endpoint returns consistent responses.
Returns True if stable, False if responses vary for identical inputs (jitter)."""
data, _ = self.format_agnostic_decode(probe_value)
if encoding == "unknown":
return True
# Build a fixed probe to test stability
stability_value = self.format_agnostic_encode(b"\x00" * 16 + data[-16:], encoding)
stability_hashes = []
for _ in range(3):
r = await self.standard_probe(self.event.data["type"], cookies, stability_value)
if r:
body = r.text
for encoded in [stability_value, stability_value.replace("+", " "), quote(stability_value)]:
body = body.replace(encoded, "")
stability_hashes.append(hash(body))
if len(set(stability_hashes)) > 1:
self.debug(
f"Endpoint produces unstable responses for identical inputs "
f"({len(set(stability_hashes))}/{len(stability_hashes)} unique), "
f"skipping differential analysis"
)
return False
return True

async def padding_oracle(self, probe_value, cookies):
data, encoding = self.format_agnostic_decode(probe_value)
possible_block_sizes = self.possible_block_sizes(
len(data)
) # determine possible block sizes for the ciphertext

# Stability pre-check: verify the endpoint returns consistent responses
# for identical inputs before attempting differential analysis
if not await self._check_endpoint_stability(probe_value, encoding, cookies):
return

for block_size in possible_block_sizes:
padding_oracle_result = await self.padding_oracle_execute(data, encoding, block_size, cookies)
# if we get a negative result first, theres a 1/255 change it's a false negative. To rule that out, we must retry again with possible_first_byte set to false
Expand Down Expand Up @@ -428,6 +550,10 @@ async def fuzz(self):
self.debug("Parameter value does not appear to be cryptographic, aborting tests")
return

# ECB Mode Detection (passive, zero HTTP requests)
if possible_block_cipher:
self.detect_ecb(probe_value)

# Cryptographic Response Divergence Test

http_compare = self.compare_baseline(self.event.data["type"], probe_value, cookies)
Expand Down Expand Up @@ -488,6 +614,9 @@ async def fuzz(self):
)
await self.padding_oracle(probe_value, cookies)

# CBC Bit-Flipping Test
await self.cbc_bitflip(probe_value, cookies)

# Hash identification / Potential Length extension attack
data, encoding = crypto.format_agnostic_decode(probe_value)
# see if its possible that a given value is a hash, and if so, which one
Expand Down
4 changes: 3 additions & 1 deletion bbot/modules/lightfuzz/submodules/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ async def fuzz(self):
and doubledot_probe[0] is False
and doubledot_probe[3] is not None
and doubledot_probe[1] != ["header"]
and "The requested URL was rejected" not in doubledot_probe[3].text
and not await self.lightfuzz.helpers.yara.match(
self.lightfuzz.waf_yara_rules, doubledot_probe[3].text
)
):
confirmations += 1
self.verbose(f"Got possible Path Traversal detection: [{str(confirmations)}] Confirmations")
Expand Down
Loading
Loading