diff --git a/bbot/modules/waf_bypass.py b/bbot/modules/waf_bypass.py new file mode 100644 index 0000000000..e552b244c3 --- /dev/null +++ b/bbot/modules/waf_bypass.py @@ -0,0 +1,282 @@ +from radixtarget import RadixTarget +from bbot.modules.base import BaseModule +from bbot.core.helpers.simhash import compute_simhash + + +class waf_bypass(BaseModule): + """ + Module to detect WAF bypasses by finding direct IP access to WAF-protected content. + + Overview: + Throughout the scan, we collect: + 1. WAF-protected domains (identified by CloudFlare/Imperva tags) and their SimHash content fingerprints + 2. All domain->IP mappings from DNS resolution of URL events + 3. Cloud IPs separately tracked via "cloud-ip" tags + + In finish(), we test if WAF-protected content can be accessed directly via IPs from non-protected domains. + Optionally, it explores IP neighbors within the same ASN to find additional bypass candidates. + """ + + watched_events = ["URL"] + produced_events = ["FINDING"] + options = { + "similarity_threshold": 0.90, + "search_ip_neighbors": True, + "neighbor_cidr": 24, # subnet size to explore when gathering neighbor IPs + } + + options_desc = { + "similarity_threshold": "Similarity threshold for content matching", + "search_ip_neighbors": "Also check IP neighbors of the target domain", + "neighbor_cidr": "CIDR mask (24-31) used for neighbor enumeration when search_ip_neighbors is true", + } + flags = ["active", "safe", "web-heavy"] + meta = { + "description": "Detects potential WAF bypasses", + "author": "@liquidsec", + "created_date": "2025-09-26", + } + + async def setup(self): + # Track protected domains and their potential bypass CIDRs + self.protected_domains = {} # {domain: event} - track protected domains and store their parent events + self.domain_ip_map = {} # {full_domain: set(ips)} - track all IPs for each domain + self.content_fingerprints = {} # {url: {simhash, http_code}} - track the content fingerprints for each URL + self.similarity_threshold = self.config.get("similarity_threshold", 0.90) + self.search_ip_neighbors = self.config.get("search_ip_neighbors", True) + self.neighbor_cidr = int(self.config.get("neighbor_cidr", 24)) + + if self.search_ip_neighbors and not (24 <= self.neighbor_cidr <= 31): + self.warning(f"Invalid neighbor_cidr {self.neighbor_cidr}. Must be between 24 and 31.") + return False + # Keep track of (protected_domain, ip) pairs we have already attempted to bypass + self.attempted_bypass_pairs = set() + # Keep track of any IPs that came from hosts that are "cloud-ips" + self.cloud_ips = set() + return True + + async def filter_event(self, event): + if "endpoint" in event.tags: + return False, "WAF bypass module only considers directory URLs" + return True + + async def handle_event(self, event): + domain = str(event.host) + url = event.url + + # Store the IPs that each domain (that came from a URL event) resolves to. We have to resolve ourself, since normal BBOT DNS resolution doesn't keep ALL the IPs + domain_dns_response = await self.helpers.dns.resolve(domain) + if domain_dns_response: + if domain not in self.domain_ip_map: + self.domain_ip_map[domain] = set() + for ip in domain_dns_response: + ip_str = str(ip) + # Validate that this is actually an IP address before storing + if self.helpers.is_ip(ip_str): + self.domain_ip_map[domain].add(ip_str) + self.debug(f"Mapped domain {domain} to IP {ip_str}") + if "cloud-ip" in event.tags: + self.cloud_ips.add(ip_str) + self.debug(f"Added cloud-ip {ip_str} to cloud_ips") + else: + self.warning(f"DNS resolution for {domain} returned non-IP result: {ip_str}") + else: + self.warning(f"DNS resolution failed for {domain}") + + # Detect WAF/CDN protection based on tags + provider_name = None + if "cdn-cloudflare" in event.tags or "waf-cloudflare" in event.tags: + provider_name = "CloudFlare" + elif "cdn-imperva" in event.tags: + provider_name = "Imperva" + + is_protected = provider_name is not None + + if is_protected: + self.debug(f"{provider_name} protection detected via tags: {event.tags}") + # Save the full domain and event for WAF-protected URLs, this is necessary to find the appropriate parent event later in .finish() + self.protected_domains[domain] = event + self.debug(f"Found {provider_name}-protected domain: {domain}") + + response = await self.get_url_content(url) + if not response: + self.debug(f"Failed to get response from protected URL {url}") + return + + if not response.text: + self.debug(f"Failed to get content from protected URL {url}") + return + + # Store a "simhash" (fuzzy hash) of the response data for later comparison + simhash = await self.helpers.run_in_executor_mp(compute_simhash, response.text) + + self.content_fingerprints[url] = { + "simhash": simhash, + "http_code": response.status_code, + } + self.debug(f"Stored simhash of response from {url} (content length: {len(response.text)})") + + async def get_url_content(self, url, ip=None): + """Helper function to fetch content from a URL, optionally through specific IP""" + try: + if ip: + self.debug(f"Fetching with resolve_ip={ip} for {url}") + response = await self.helpers.request(url=url, resolve_ip=str(ip)) + if response: + return response + else: + self.debug(f"No content returned for {url} via IP {ip}") + else: + response = await self.helpers.request(url=url) + if not response: + self.debug(f"No response received from {url}") + return None + elif response.status_code in [200, 301, 302, 500]: + return response + else: + self.debug( + f"Failed to fetch content from {url} - Status: {response.status_code} (not in allowed list)" + ) + return None + except Exception as e: + self.debug(f"Error fetching content from {url}: {str(e)}") + return None + + async def check_ip(self, ip, source_domain, protected_domain, source_event): + matching_url = next((url for url in self.content_fingerprints.keys() if protected_domain in url), None) + + if not matching_url: + self.debug(f"No matching URL found for {protected_domain} in stored fingerprints") + return None + + original_response = self.content_fingerprints.get(matching_url) + if not original_response: + self.debug(f"did not get original response for {matching_url}") + return None + + self.verbose(f"Bypass attempt: {protected_domain} via {ip} from {source_domain}") + + bypass_response = await self.get_url_content(matching_url, ip) + if not bypass_response: + self.debug(f"Failed to get content through IP {ip} for URL {matching_url}") + return None + + bypass_simhash = await self.helpers.run_in_executor_mp(compute_simhash, bypass_response.text or "") + + if original_response["http_code"] != bypass_response.status_code: + self.debug(f"Ignoring code difference {original_response['http_code']} != {bypass_response.status_code}") + return None + + is_redirect = bypass_response.status_code in (301, 302) + + similarity = self.helpers.simhash.similarity(original_response["simhash"], bypass_simhash) + + # For redirects, require exact match (1.0), otherwise use configured threshold + required_threshold = 1.0 if is_redirect else self.similarity_threshold + return (matching_url, ip, similarity, source_event) if similarity >= required_threshold else None + + async def finish(self): + self.verbose(f"Found {len(self.protected_domains)} Protected Domains") + + confirmed_bypasses = [] # [(protected_url, matching_ip, similarity)] + ip_bypass_candidates = {} # {ip: domain} + waf_ips = set() + + # First collect all the WAF-protected DOMAINS we've seen + for protected_domain in self.protected_domains: + if protected_domain in self.domain_ip_map: + waf_ips.update(self.domain_ip_map[protected_domain]) + + # Then collect all the non-WAF-protected IPs we've seen + for domain, ips in self.domain_ip_map.items(): + self.debug(f"Checking IP {ips} from domain {domain}") + if domain not in self.protected_domains: # If it's not a protected domain + for ip in ips: + # Validate that this is actually an IP address before processing + if not self.helpers.is_ip(ip): + self.warning(f"Skipping non-IP address '{ip}' found in domain_ip_map for {domain}") + continue + + if ip not in waf_ips: # And IP isn't a known WAF IP + ip_bypass_candidates[ip] = domain + self.debug(f"Added potential bypass IP {ip} from domain {domain}") + + # if we have IP neighbors searching enabled, and the IP isn't a cloud IP, we can add the IP neighbors to our list of potential bypasses + if self.search_ip_neighbors and ip not in self.cloud_ips: + import ipaddress + + # Get the ASN data for the IP - used later to keep brute force from crossing ASN boundaries + asn_data = await self.helpers.asn.ip_to_subnets(str(ip)) + if asn_data: + # Build a radix tree of the ASN subnets for the IP + asn_subnets_tree = RadixTarget() + for subnet in asn_data["subnets"]: + asn_subnets_tree.insert(subnet) + + # Generate a network based on the neighbor_cidr option + neighbor_net = ipaddress.ip_network(f"{ip}/{self.neighbor_cidr}", strict=False) + for neighbor_ip in neighbor_net.hosts(): + neighbor_ip_str = str(neighbor_ip) + # Don't add the neighbor IP if its: ip we started with, a waf ip, or already in the list + if ( + neighbor_ip_str == ip + or neighbor_ip_str in waf_ips + or neighbor_ip_str in ip_bypass_candidates + ): + continue + + # make sure we aren't crossing an ASN boundary with our neighbor exploration + if asn_subnets_tree.search(neighbor_ip_str): + self.debug( + f"Added Neighbor IP ({ip} -> {neighbor_ip_str}) as potential bypass IP derived from {domain}" + ) + ip_bypass_candidates[neighbor_ip_str] = domain + else: + self.debug(f"IP {ip} is in WAF IPS so we don't check as potential bypass") + + self.verbose(f"\nFound {len(ip_bypass_candidates)} non-WAF IPs to check") + + coros = [] + new_pairs_count = 0 + + for protected_domain, source_event in self.protected_domains.items(): + for ip, src in ip_bypass_candidates.items(): + combo = (protected_domain, ip) + if combo in self.attempted_bypass_pairs: + continue + self.attempted_bypass_pairs.add(combo) + new_pairs_count += 1 + self.debug(f"Checking {ip} for {protected_domain} from {src}") + coros.append(self.check_ip(ip, src, protected_domain, source_event)) + + self.verbose( + f"Checking {new_pairs_count} new bypass pairs (total attempted: {len(self.attempted_bypass_pairs)})..." + ) + + self.debug(f"about to start {len(coros)} coroutines") + async for completed in self.helpers.as_completed(coros): + result = await completed + if result: + confirmed_bypasses.append(result) + + if confirmed_bypasses: + # Aggregate by URL and similarity + agg = {} + for matching_url, ip, similarity, src_evt in confirmed_bypasses: + rec = agg.setdefault((matching_url, similarity), {"ips": [], "event": src_evt}) + rec["ips"].append(ip) + + for (matching_url, sim_key), data in agg.items(): + ip_list = data["ips"] + ip_list_str = ", ".join(sorted(set(ip_list))) + await self.emit_event( + { + "severity": "MEDIUM", + "confidence": "CONFIRMED", + "name": "WAF Bypass", + "url": matching_url, + "description": f"WAF Bypass Confirmed - Direct IPs: {ip_list_str} for {matching_url}. Similarity {sim_key:.2%}", + }, + "FINDING", + data["event"], + ) diff --git a/bbot/presets/waf-bypass.yml b/bbot/presets/waf-bypass.yml new file mode 100644 index 0000000000..4a4b7e06e5 --- /dev/null +++ b/bbot/presets/waf-bypass.yml @@ -0,0 +1,19 @@ +description: WAF bypass detection with subdomain enumeration + +flags: + # enable subdomain enumeration to find potential bypass targets + - subdomain-enum + +modules: + # explicitly enable the waf_bypass module for detection + - waf_bypass + # ensure http is enabled for web probing + - http + +config: + # waf_bypass module configuration + modules: + waf_bypass: + similarity_threshold: 0.90 + search_ip_neighbors: true + neighbor_cidr: 24 \ No newline at end of file diff --git a/bbot/test/test_step_2/module_tests/test_module_waf_bypass.py b/bbot/test/test_step_2/module_tests/test_module_waf_bypass.py new file mode 100644 index 0000000000..6e000732e5 --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_waf_bypass.py @@ -0,0 +1,137 @@ +from .base import ModuleTestBase +from bbot.modules.base import BaseModule + + +class TestWAFBypass(ModuleTestBase): + targets = ["protected.test", "direct.test"] + module_name = "waf_bypass" + modules_overrides = ["waf_bypass", "http"] + config_overrides = { + "scope": {"report_distance": 2}, + "modules": {"waf_bypass": {"search_ip_neighbors": True, "neighbor_cidr": 30}}, + } + + PROTECTED_IP = "127.0.0.129" + DIRECT_IP = "127.0.0.2" + + api_response_direct = { + "asn": 15169, + "subnets": ["127.0.0.0/25"], + "asn_name": "ACME-ORG", + "org": "ACME-ORG", + "country": "US", + } + + api_response_cloudflare = { + "asn": 13335, + "asn_name": "CLOUDFLARENET", + "country": "US", + "ip": "127.0.0.129", + "org": "Cloudflare, Inc.", + "rir": "ARIN", + "subnets": ["127.0.0.128/25"], + } + + class DummyModule(BaseModule): + watched_events = ["DNS_NAME"] + _name = "dummy_module" + events_seen = [] + + async def handle_event(self, event): + if event.data == "protected.test": + await self.helpers.sleep(0.5) + self.events_seen.append(event.data) + url = "http://protected.test:8888/" + url_event = self.scan.make_event( + url, "URL", parent=self.scan.root_event, tags=["cdn-cloudflare", "in-scope", "status-200"] + ) + if url_event is not None: + await self.emit_event(url_event) + + elif event.data == "direct.test": + await self.helpers.sleep(0.5) + self.events_seen.append(event.data) + url = "http://direct.test:8888/" + url_event = self.scan.make_event( + url, "URL", parent=self.scan.root_event, tags=["in-scope", "status-200"] + ) + if url_event is not None: + await self.emit_event(url_event) + + async def setup_after_prep(self, module_test): + await module_test.mock_dns( + { + "protected.test": {"A": [self.PROTECTED_IP]}, + "direct.test": {"A": [self.DIRECT_IP]}, + "": {"A": []}, + } + ) + + self.module_test = module_test + + self.dummy_module = self.DummyModule(module_test.scan) + module_test.scan.modules["dummy_module"] = self.dummy_module + + # Mock ASN lookups via asndb + asn_helper = module_test.scan.helpers.asn + + async def mock_lookup_ip(ip, include_subnets=False): + if str(ip) == self.DIRECT_IP or str(ip).startswith("127.0.0."): + return self.api_response_direct + elif str(ip) == self.PROTECTED_IP: + return self.api_response_cloudflare + return None + + module_test.monkeypatch.setattr(asn_helper.client, "lookup_ip", mock_lookup_ip) + + expect_args = {"method": "GET", "uri": "/", "headers": {"Host": "protected.test"}} + respond_args = {"status": 200, "response_data": "HELLO THERE!"} + module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) + + # Patch WAF bypass get_url_content to control similarity outcome + waf_module = module_test.scan.modules["waf_bypass"] + + class FakeResponse: + def __init__(self, text, status_code): + self.text = text + self.status_code = status_code + + async def fake_get_url_content(self_waf, url, ip=None): + if "protected.test" in url and (ip is None or ip == "127.0.0.1"): + return FakeResponse("PROTECTED CONTENT!", 200) + else: + return FakeResponse("ERROR!", 404) + + import types + + module_test.monkeypatch.setattr( + waf_module, + "get_url_content", + types.MethodType(fake_get_url_content, waf_module), + raising=True, + ) + + # 7. Monkeypatch tldextract so base_domain is never empty + def fake_tldextract(domain): + import types as _t + + return _t.SimpleNamespace(top_domain_under_public_suffix=domain) + + module_test.monkeypatch.setattr( + waf_module.helpers, + "tldextract", + fake_tldextract, + raising=True, + ) + + def check(self, module_test, events): + waf_bypass_events = [e for e in events if e.type == "FINDING"] + assert waf_bypass_events, "No FINDING event produced" + + correct_description = [ + e + for e in waf_bypass_events + if "WAF Bypass Confirmed - Direct IPs: 127.0.0.1 for http://protected.test:8888/. Similarity 100.00%" + in e.data["description"] + ] + assert correct_description, "Incorrect description"