diff --git a/bbot/modules/virtualhost.py b/bbot/modules/virtualhost.py new file mode 100644 index 0000000000..a0ed4eb7b6 --- /dev/null +++ b/bbot/modules/virtualhost.py @@ -0,0 +1,1013 @@ +from urllib.parse import urlparse +import random +import string + +from bbot.modules.base import BaseModule +from bbot.core.helpers.simhash import compute_simhash + + +class virtualhost(BaseModule): + watched_events = ["URL"] + produced_events = ["VIRTUAL_HOST", "DNS_NAME", "HTTP_RESPONSE"] + flags = ["active", "loud", "slow"] + meta = {"description": "Fuzz for virtual hosts", "created_date": "2022-05-02", "author": "@liquidsec"} + + deps_pip = ["pyOpenSSL~=25.3.0"] + + SIMILARITY_THRESHOLD = 0.8 + CANARY_LENGTH = 12 + MAX_RESULTS_FLOOD_PROTECTION = 50 + + special_virtualhost_list = ["127.0.0.1", "localhost", "host.docker.internal"] + options = { + "brute_wordlist": "https://raw.githubusercontent.com/danielmiessler/SecLists/master/Discovery/DNS/subdomains-top1million-5000.txt", + "force_basehost": "", + "brute_lines": 2000, + "subdomain_brute": True, + "mutation_check": True, + "special_hosts": False, + "certificate_sans": False, + "max_concurrent_requests": 80, + "require_inaccessible": True, + "wordcloud_check": False, + "report_interesting_default_content": True, + } + options_desc = { + "brute_wordlist": "Wordlist containing subdomains", + "force_basehost": "Use a custom base host (e.g. evilcorp.com) instead of the default behavior of using the current URL", + "brute_lines": "take only the first N lines from the wordlist when finding directories", + "subdomain_brute": "Enable subdomain brute-force on target host", + "mutation_check": "Enable trying mutations of the target host", + "special_hosts": "Enable testing of special virtual host list (localhost, etc.)", + "certificate_sans": "Enable extraction and testing of Subject Alternative Names from certificates", + "wordcloud_check": "Enable check using scan-wide wordcloud data on target host", + "max_concurrent_requests": "Maximum number of concurrent virtual host requests", + "require_inaccessible": "Only test virtual hosts that are not directly accessible (for discovering hidden content)", + "report_interesting_default_content": "Report interesting default content", + } + + in_scope_only = True + + virtualhost_ignore_strings = [ + "We weren't able to find your Azure Front Door Service", + "The http request header is incorrect.", + ] + + async def setup(self): + self.max_concurrent = self.config.get("max_concurrent_requests", 80) + self.scanned_hosts = {} + self.wordcloud_tried_hosts = set() + self.brute_wordlist = await self.helpers.wordlist( + self.config.get("brute_wordlist"), lines=self.config.get("brute_lines", 2000) + ) + self.similarity_cache = {} # Cache for similarity results + + self.waf_strings = self.helpers.get_waf_strings() + self.virtualhost_ignore_strings + + return True + + def _get_basehost(self, event): + """Get the basehost and subdomain from the event""" + basehost = self.helpers.parent_domain(event.parsed_url.hostname) + if not basehost: + raise ValueError(f"No parent domain found for {event.parsed_url.hostname}") + subdomain = event.parsed_url.hostname.removesuffix(basehost).rstrip(".") + return basehost, subdomain + + async def _get_baseline_response(self, event, normalized_url, host_ip): + """Get baseline response for a host using the appropriate method (HTTPS SNI or HTTP Host header)""" + is_https = event.parsed_url.scheme == "https" + host = event.parsed_url.netloc + + if is_https: + port = event.parsed_url.port or 443 + baseline_response = await self.helpers.request( + url=f"https://{host}:{port}/", + resolve_ip=host_ip, + ) + else: + baseline_response = await self.helpers.request( + url=normalized_url, + headers={"Host": host}, + resolve_ip=host_ip, + ) + + return baseline_response + + async def handle_event(self, event): + if not self.helpers.is_ip(event.host) or self.config.get("force_basehost"): + scheme = event.parsed_url.scheme + host = event.parsed_url.netloc + normalized_url = f"{scheme}://{host}" + + # since we normalize the URL to the host level, + if normalized_url in self.scanned_hosts: + return + + self.scanned_hosts[normalized_url] = event + + if self.config.get("force_basehost"): + basehost = self.config.get("force_basehost") + subdomain = "" + else: + basehost, subdomain = self._get_basehost(event) + + is_https = event.parsed_url.scheme == "https" + + if not event.resolved_hosts: + self.debug(f"HANDLE EVENT METHOD: No resolved hosts for {normalized_url}, skipping virtual host check") + return None + + host_ip = str(next(iter(event.resolved_hosts))) + + baseline_response = await self._get_baseline_response(event, normalized_url, host_ip) + if not baseline_response: + self.warning(f"Failed to get baseline response for {normalized_url}") + return None + + if not await self._wildcard_canary_check(scheme, host, event, host_ip, baseline_response): + self.verbose( + f"WILDCARD CHECK FAILED in handle_event: Skipping {normalized_url} - failed virtual host wildcard check" + ) + return None + else: + self.verbose(f"WILDCARD CHECK PASSED in handle_event: Proceeding with {normalized_url}") + + # Phase 1: Main virtual host bruteforce + if self.config.get("subdomain_brute", True): + self.verbose(f"=== Starting subdomain brute-force on {normalized_url} ===") + await self._run_virtualhost_phase( + "Target host Subdomain Brute-force", + normalized_url, + basehost, + host_ip, + is_https, + event, + "subdomain", + ) + + # only run mutations if there is an actual subdomain (to mutate) + if subdomain: + # Phase 2: Check existing host for mutations + if self.config.get("mutation_check", True): + self.verbose(f"=== Starting mutations check on {normalized_url} ===") + await self._run_virtualhost_phase( + "Mutations on target host", + normalized_url, + basehost, + host_ip, + is_https, + event, + "mutation", + wordlist=self.mutations_check(subdomain), + ) + + # Phase 3: Special virtual host list + if self.config.get("special_hosts", True): + self.verbose(f"=== Starting special virtual hosts check on {normalized_url} ===") + await self._run_virtualhost_phase( + "Special virtual host list", + normalized_url, + "", + host_ip, + is_https, + event, + "random", + wordlist=self.helpers.tempfile(self.special_virtualhost_list, pipe=False), + skip_dns_host=True, + ) + + # Phase 4: Obtain subject alternate names from certicate and analyze them + if self.config.get("certificate_sans", True): + self.verbose(f"=== Starting certificate SAN analysis on {normalized_url} ===") + if is_https: + subject_alternate_names = await self._analyze_subject_alternate_names(event.data) + if subject_alternate_names: + self.debug( + f"Found {len(subject_alternate_names)} Subject Alternative Names from certificate: {subject_alternate_names}" + ) + + # Use SANs as potential virtual hosts for testing + san_wordlist = self.helpers.tempfile(subject_alternate_names, pipe=False) + await self._run_virtualhost_phase( + "Certificate Subject Alternate Name", + normalized_url, + "", + host_ip, + is_https, + event, + "random", + wordlist=san_wordlist, + skip_dns_host=True, + ) + + async def _analyze_subject_alternate_names(self, url): + """Analyze subject alternate names from certificate via blasthttp cert_info""" + parsed = urlparse(url) + host = parsed.netloc + + response = await self.helpers.request(url=url) + if not response or not response.cert_info: + self.debug(f"No certificate data available for {url}") + return [] + + subject_alt_names = [] + try: + for san in response.cert_info.sans: + self.debug(f"Found SAN: {san}") + if san != host and san not in subject_alt_names: + subject_alt_names.append(san) + except Exception as e: + self.warning(f"Error parsing certificate for {url}: {e}") + + self.debug( + f"Found {len(subject_alt_names)} Subject Alternative Names: {subject_alt_names} (besides original target host {host})" + ) + return subject_alt_names + + async def _report_interesting_default_content(self, event, canary_hostname, host_ip, canary_response): + discovery_method = "Interesting Default Content (from intentionally-incorrect canary host)" + # Build URL with explicit authority to avoid double-port issues + authority = ( + f"{event.parsed_url.hostname}:{event.parsed_url.port}" + if event.parsed_url.port is not None + else event.parsed_url.hostname + ) + # Use the explicit canary hostname used in the wildcard request (works for HTTP Host and HTTPS SNI) + canary_host = (canary_hostname or "").split(":")[0] + virtualhost_dict = { + "host": str(event.host), + "url": f"{event.parsed_url.scheme}://{authority}/", + "virtual_host": canary_host, + "description": self._build_description(discovery_method, canary_response, True, host_ip), + "ip": host_ip, + } + + await self.emit_event( + virtualhost_dict, + "VIRTUAL_HOST", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {discovery_method} for {event.data} and found {{event.type}}: {canary_host}", + ) + + # Emit HTTP_RESPONSE event with the canary response data + headers = dict(canary_response.headers) if canary_response.headers else {} + + # Get the scheme from the actual probe URL + probe_url = str(canary_response.url) if canary_response.url else "" + parsed_probe_url = urlparse(probe_url) + actual_scheme = parsed_probe_url.scheme if parsed_probe_url.scheme else "http" + + body = canary_response.text or "" + http_response_data = { + "input": canary_host, + "url": f"{actual_scheme}://{canary_host}/", + "method": "GET", + "status_code": canary_response.status_code, + "content_length": len(body), + "body": body, + "header": headers, + "raw_header": "", + } + + # Include location header for redirect handling + location = headers.get("location", "") + if location: + http_response_data["location"] = location + + http_response_event = await self.emit_event( + http_response_data, + "HTTP_RESPONSE", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {discovery_method} for {event.data} and found {{event.type}}: {canary_host}", + ) + # Set scope distance to match parent's scope distance for HTTP_RESPONSE events + if http_response_event: + http_response_event.scope_distance = event.scope_distance + + def _get_canary_random_host(self, host, basehost, mode="subdomain"): + """Generate a random host for the canary""" + # Seed RNG with domain to get consistent canary hosts for same domain + random.seed(host) + + # Generate canary hostname based on mode + if mode == "mutation": + # Prepend random 4-character string with dash to existing hostname + random_prefix = "".join(random.choice(string.ascii_lowercase) for i in range(4)) + canary_host = f"{random_prefix}-{host}" + elif mode == "subdomain": + # Default subdomain mode - add random subdomain + canary_host = "".join(random.choice(string.ascii_lowercase) for i in range(self.CANARY_LENGTH)) + basehost + elif mode == "random_append": + # Append random string to existing hostname (first domain level) + random_suffix = "".join(random.choice(string.ascii_lowercase) for i in range(4)) + canary_host = f"{host.split('.')[0]}{random_suffix}.{'.'.join(host.split('.')[1:])}" + elif mode == "random": + # Fully random hostname with .com TLD + random_host = "".join(random.choice(string.ascii_lowercase) for i in range(self.CANARY_LENGTH)) + canary_host = f"{random_host}.com" + else: + raise ValueError(f"Invalid canary mode: {mode}") + + return canary_host + + async def _get_canary_response(self, normalized_url, basehost, host_ip, is_https, mode="subdomain"): + """Setup canary response for comparison using the appropriate technique. Returns canary response or None on failure.""" + + parsed = urlparse(normalized_url) + # Use hostname without port to avoid duplicating port in canary host + host = parsed.hostname or (parsed.netloc.split(":")[0] if ":" in parsed.netloc else parsed.netloc) + + # Seed RNG with domain to get consistent canary hosts for same domain + canary_host = self._get_canary_random_host(host, basehost, mode) + + # Get canary response + if is_https: + port = parsed.port or 443 + canary_response = await self.helpers.request( + url=f"https://{canary_host}:{port}/", + resolve_ip=host_ip, + ) + else: + canary_response = await self.helpers.request( + url=normalized_url, + headers={"Host": canary_host}, + resolve_ip=host_ip, + ) + + return canary_response + + async def _is_host_accessible(self, url): + """ + Check if a URL is already accessible via direct HTTP request. + Returns True if the host is accessible (and should be skipped), False otherwise. + """ + response = await self.helpers.request(url=url) + if response and response.status_code > 0: + return True + return False + + async def _wildcard_canary_check(self, probe_scheme, probe_host, event, host_ip, probe_response): + """Change one char in probe_host and test - if responses are similar, it's probably a wildcard""" + + # Extract hostname and port separately to avoid corrupting the port portion + original_hostname = event.parsed_url.hostname or "" + original_port = event.parsed_url.port + + # Try to mutate the first alphabetic character in the hostname + modified_hostname = None + for i, char in enumerate(original_hostname): + if char.isalpha(): + new_char = "z" if char != "z" else "a" + modified_hostname = original_hostname[:i] + new_char + original_hostname[i + 1 :] + break + + if modified_hostname is None: + # Fallback: generate random hostname of similar length (hostname-only) + modified_hostname = "".join( + random.choice(string.ascii_lowercase) for _ in range(len(original_hostname) or 12) + ) + + # Build modified host strings for each protocol + https_modified_host_for_sni = modified_hostname + http_modified_host_for_header = f"{modified_hostname}:{original_port}" if original_port else modified_hostname + + # Test modified host + if probe_scheme == "https": + port = event.parsed_url.port or 443 + # Log the canary URL for the wildcard SNI test + self.debug( + f"CANARY URL: https://{https_modified_host_for_sni}:{port}/ [phase=wildcard-check, mode=single-char-mutation]" + ) + wildcard_canary_response = await self.helpers.request( + url=f"https://{https_modified_host_for_sni}:{port}/", + resolve_ip=host_ip, + ) + else: + # Log the canary URL for the wildcard Host header test + http_port = event.parsed_url.port or 80 + self.debug( + f"CANARY URL: {probe_scheme}://{http_modified_host_for_header if ':' in http_modified_host_for_header else f'{http_modified_host_for_header}:{http_port}'}/ [phase=wildcard-check, mode=single-char-mutation]" + ) + wildcard_canary_response = await self.helpers.request( + url=f"{probe_scheme}://{event.parsed_url.netloc}/", + headers={"Host": http_modified_host_for_header}, + resolve_ip=host_ip, + ) + + if not wildcard_canary_response or wildcard_canary_response.status_code == 0: + self.debug( + f"Wildcard check: {http_modified_host_for_header} failed to respond, assuming {probe_host} is valid" + ) + return True # Modified failed, original probably valid + + # If HTTP status codes differ, consider this a pass (not wildcard) + if probe_response.status_code != wildcard_canary_response.status_code: + self.debug( + f"WILDCARD CHECK OK (status mismatch): {probe_host} ({probe_response.status_code}) vs {http_modified_host_for_header} ({wildcard_canary_response.status_code})" + ) + if ( + self.config.get("report_interesting_default_content", True) + and wildcard_canary_response.status_code == 200 + and len(wildcard_canary_response.text or "") > 40 + ): + canary_hostname = ( + https_modified_host_for_sni if probe_scheme == "https" else http_modified_host_for_header + ) + await self._report_interesting_default_content( + event, canary_hostname, host_ip, wildcard_canary_response + ) + return True + + probe_simhash = await self.helpers.run_in_executor_mp(compute_simhash, probe_response.text or "") + wildcard_simhash = await self.helpers.run_in_executor_mp(compute_simhash, wildcard_canary_response.text or "") + similarity = self.helpers.simhash.similarity(probe_simhash, wildcard_simhash) + + # Compare original probe response with modified response + + result = similarity <= self.SIMILARITY_THRESHOLD + + if not result: + self.debug( + f"WILDCARD DETECTED: {probe_host} vs {http_modified_host_for_header} similarity: {similarity:.3f} (threshold: {self.SIMILARITY_THRESHOLD}) -> FAIL (wildcard detected)" + ) + else: + self.debug( + f"WILDCARD CHECK OK: {probe_host} vs {http_modified_host_for_header} similarity: {similarity:.3f} (threshold: {self.SIMILARITY_THRESHOLD}) -> PASS (not wildcard)" + ) + if ( + self.config.get("report_interesting_default_content", True) + and wildcard_canary_response.status_code == 200 + and len(wildcard_canary_response.text or "") > 40 + ): + canary_hostname = ( + https_modified_host_for_sni if probe_scheme == "https" else http_modified_host_for_header + ) + await self._report_interesting_default_content( + event, canary_hostname, host_ip, wildcard_canary_response + ) + + return result # True if they're different (good), False if similar (wildcard) + + async def _run_virtualhost_phase( + self, + discovery_method, + normalized_url, + basehost, + host_ip, + is_https, + event, + canary_mode, + wordlist=None, + skip_dns_host=False, + ): + """Helper method to run a virtual host discovery phase and optionally mutations""" + + canary_response = await self._get_canary_response( + normalized_url, basehost, host_ip, is_https, mode=canary_mode + ) + + if not canary_response: + self.debug(f"Failed to get canary response for {normalized_url}, skipping virtual host detection") + return [] + + results = await self.curl_virtualhost( + discovery_method, + normalized_url, + basehost, + event, + canary_response, + canary_mode, + wordlist, + skip_dns_host, + ) + + # Emit all valid results + for virtual_host_data in results: + # Emit VIRTUAL_HOST event + await self.emit_event( + virtual_host_data["virtualhost_dict"], + "VIRTUAL_HOST", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {virtual_host_data['discovery_method']} for {event.data} and found {{event.type}}: {virtual_host_data['probe_host']} (similarity: {virtual_host_data['similarity']:.2%})", + ) + + # Emit HTTP_RESPONSE event with the probe response data + probe_resp = virtual_host_data["probe_response"] + headers = dict(probe_resp.headers) if probe_resp.headers else {} + + # Get the scheme from the actual probe URL + probe_url = str(probe_resp.url) if probe_resp.url else "" + parsed_probe_url = urlparse(probe_url) + actual_scheme = parsed_probe_url.scheme if parsed_probe_url.scheme else "http" + + body = probe_resp.text or "" + http_response_data = { + "input": virtual_host_data["probe_host"], + "url": f"{actual_scheme}://{virtual_host_data['probe_host']}/", + "method": "GET", + "status_code": probe_resp.status_code, + "content_length": len(body), + "body": body, + "header": headers, + "raw_header": "", + } + + # Include location header for redirect handling + location = headers.get("location", "") + if location: + http_response_data["location"] = location + + http_response_event = await self.emit_event( + http_response_data, + "HTTP_RESPONSE", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {virtual_host_data['discovery_method']} for {event.data} and found {{event.type}}: {virtual_host_data['probe_host']}", + ) + # Set scope distance to match parent's scope distance for HTTP_RESPONSE events + if http_response_event: + http_response_event.scope_distance = event.scope_distance + + # Emit DNS_NAME_UNVERIFIED event if needed + if virtual_host_data["skip_dns_host"] is False: + await self.emit_event( + virtual_host_data["virtualhost_dict"]["virtual_host"], + "DNS_NAME_UNVERIFIED", + parent=event, + tags=["virtual-host"], + context=f"{{module}} discovered virtual host via {virtual_host_data['discovery_method']} for {event.data} and found {{event.type}}: {{event.data}}", + ) + + async def curl_virtualhost( + self, + discovery_method, + normalized_url, + basehost, + event, + canary_response, + canary_mode, + wordlist=None, + skip_dns_host=False, + ): + if wordlist is None: + wordlist = self.brute_wordlist + + # Get baseline host for comparison and determine scheme from event + baseline_host = event.parsed_url.netloc + + # Collect all words for concurrent processing + candidates_to_check = [] + for word in self.helpers.read_file(wordlist): + word = word.strip() + if not word: + continue + + # Construct virtual host header + if basehost: + # Wordlist entries are subdomain prefixes - append basehost + probe_host = f"{word}.{basehost}" + + else: + # No basehost - use as-is + probe_host = word + + # Skip if this would be the same as the original host + if probe_host == baseline_host: + continue + + candidates_to_check.append(probe_host) + + self.debug(f"Loaded {len(candidates_to_check)} candidates from wordlist for {discovery_method}") + + host_ips = [str(ip) for ip in event.resolved_hosts] + total_tests = len(candidates_to_check) * len(host_ips) + + self.verbose( + f"Initiating {total_tests} virtual host tests ({len(candidates_to_check)} candidates × {len(host_ips)} IPs) with max {self.max_concurrent} concurrent requests" + ) + + # Collect all virtual host results before emitting + virtual_host_results = [] + + # Process results as they complete with concurrency control + try: + # Build coroutines on-demand without wrapper + coroutines = ( + self._test_virtualhost( + normalized_url, + probe_host, + basehost, + event, + canary_response, + canary_mode, + skip_dns_host, + host_ip, + discovery_method, + ) + for host_ip in host_ips + for probe_host in candidates_to_check + ) + + async for completed in self.helpers.as_completed(coroutines, self.max_concurrent): + try: + result = await completed + except Exception as e: + if getattr(self.scan, "stopping", False) or getattr(self.scan, "aborting", False): + self.debug(f"CurlError during shutdown (suppressed): {e}") + break + self.debug(f"CurlError in virtualhost test (skipping this test): {e}") + continue + if result: # Only append non-None results + virtual_host_results.append(result) + self.debug( + f"ADDED RESULT {len(virtual_host_results)}: {result['probe_host']} (similarity: {result['similarity']:.3f}) [Status: {result['status_code']} | Size: {result['content_length']} bytes]" + ) + + # Early exit if we're clearly hitting false positives + if len(virtual_host_results) >= self.MAX_RESULTS_FLOOD_PROTECTION: + self.warning( + f"RESULT FLOOD DETECTED: found {len(virtual_host_results)} virtual hosts (limit: {self.MAX_RESULTS_FLOOD_PROTECTION}), likely false positives - stopping further tests and skipping reporting" + ) + break + + except Exception as e: + if getattr(self.scan, "stopping", False) or getattr(self.scan, "aborting", False): + self.debug(f"CurlError in as_completed during shutdown (suppressed): {e}") + return [] + self.warning(f"CurlError in as_completed, stopping all tests: {e}") + return [] + + # Return results for emission at _run_virtualhost_phase level + return virtual_host_results + + async def _test_virtualhost( + self, + normalized_url, + probe_host, + basehost, + event, + canary_response, + canary_mode, + skip_dns_host, + host_ip, + discovery_method, + ): + """ + Test a single virtual host candidate using HTTP Host header or HTTPS SNI + Returns virtual host data if detected, None otherwise + """ + is_https = event.parsed_url.scheme == "https" + + # Make request - different approach for HTTP vs HTTPS + if is_https: + port = event.parsed_url.port or 443 + probe_response = await self.helpers.request( + url=f"https://{probe_host}:{port}/", + resolve_ip=host_ip, + ) + else: + port = event.parsed_url.port or 80 + probe_response = await self.helpers.request( + url=normalized_url, + headers={"Host": probe_host}, + resolve_ip=host_ip, + ) + + if not probe_response or not probe_response.text: + protocol = "HTTPS" if is_https else "HTTP" + self.debug(f"{protocol} probe failed for {probe_host} on ip {host_ip} - no response or empty data") + return None + + similarity = await self.analyze_response(probe_host, probe_response, canary_response, event) + if similarity is None: + return None + + # Different from canary = possibly real virtual host, similar to canary = probably junk + if similarity > self.SIMILARITY_THRESHOLD: + self.debug( + f"REJECTING {probe_host}: similarity {similarity:.3f} > threshold {self.SIMILARITY_THRESHOLD} (too similar to canary)" + ) + return None + else: + self.verbose( + f"POTENTIAL VIRTUALHOST {probe_host} sim={similarity:.3f} " + f"probe: {probe_response.status_code} | {len(probe_response.text or '')}B | {probe_response.url} ; " + f"canary: {canary_response.status_code} | {len(canary_response.text or '')}B | {canary_response.url}" + ) + + # Re-verify canary consistency before emission + if not await self._verify_canary_consistency( + canary_response, canary_mode, normalized_url, is_https, basehost, host_ip + ): + self.verbose( + f"CANARY CHANGED: Rejecting {probe_host}. Original canary had code {canary_response.status_code} and response data of length {len(canary_response.text or '')}" + ) + raise RuntimeError(f"Canary changed since initial test, rejecting {probe_host}") + # Canary is consistent, proceed + + probe_url = f"{event.parsed_url.scheme}://{probe_host}:{port}/" + + # Check for keyword-based virtual host wildcards + if not await self._verify_canary_keyword(probe_response, probe_url, is_https, basehost, host_ip): + return None + + # Don't emit if this would be the same as the original netloc + if probe_host == event.parsed_url.netloc: + self.verbose(f"Skipping emit for virtual host {probe_host} - is the same as the original netloc") + return None + + # Check if this virtual host is externally accessible + port = event.parsed_url.port or (443 if is_https else 80) + + is_externally_accessible = await self._is_host_accessible(probe_url) + + virtualhost_dict = { + "host": str(event.host), + "url": normalized_url, + "virtual_host": probe_host, + "description": self._build_description( + discovery_method, probe_response, is_externally_accessible, host_ip + ), + "ip": host_ip, + } + + # Skip if we require inaccessible hosts and this one is accessible + if self.config.get("require_inaccessible", True) and is_externally_accessible: + self.verbose( + f"Skipping emit for virtual host {probe_host} - is externally accessible and require_inaccessible is True" + ) + return None + + # Return data for emission at _run_virtualhost_phase level + technique = "SNI" if is_https else "Host header" + return { + "virtualhost_dict": virtualhost_dict, + "similarity": similarity, + "probe_host": probe_host, + "skip_dns_host": skip_dns_host, + "discovery_method": f"{discovery_method} ({technique})", + "status_code": probe_response.status_code, + "content_length": len(probe_response.text or ""), + "probe_response": probe_response, + } + + async def analyze_response(self, probe_host, probe_response, canary_response, event): + probe_status = probe_response.status_code + canary_status = canary_response.status_code + + # Check for invalid/no response - skip processing + if probe_status == 0 or not probe_response.text: + self.debug(f"SKIPPING {probe_host} - no valid HTTP response (status: {probe_status})") + return None + + if probe_status == 400: + self.debug(f"SKIPPING {probe_host} - got 400 Bad Request") + return None + + # Check for 421 Misdirected Request - clear signal that virtual host doesn't exist + if probe_status == 421: + self.debug(f"SKIPPING {probe_host} - got 421 Misdirected Request (SNI not configured)") + return None + + if probe_status == 502 or probe_status == 503: + self.debug(f"SKIPPING {probe_host} - got 502 or 503 Bad Gateway") + return None + + # Check for 403 Forbidden - signal that the virtual host is rejected (unless we started with a 403) + if probe_status == 403 and canary_status != 403: + self.debug(f"SKIPPING {probe_host} - got 403 Forbidden when canary status was {canary_status}") + return None + + if probe_status == 508: + self.debug(f"SKIPPING {probe_host} - got 508 Loop Detected") + return None + + # Check for redirects back to original domain - indicates virtual host just redirects to canonical + if probe_status in [301, 302]: + redirect_url = probe_response.headers.get("location", "") if probe_response.headers else "" + if redirect_url and str(event.parsed_url.netloc) in redirect_url: + self.debug(f"SKIPPING {probe_host} - redirects back to original domain {event.parsed_url.netloc}") + return None + + if any(waf_string in (probe_response.text or "") for waf_string in self.waf_strings): + self.debug(f"SKIPPING {probe_host} - got WAF response") + return None + + # Calculate content similarity to canary (junk response) + # Use probe hostname for normalization to remove hostname reflection differences + + probe_simhash = await self.helpers.run_in_executor_mp( + compute_simhash, probe_response.text or "", normalization_filter=probe_host + ) + canary_simhash = await self.helpers.run_in_executor_mp( + compute_simhash, canary_response.text or "", normalization_filter=probe_host + ) + + similarity = self.helpers.simhash.similarity(probe_simhash, canary_simhash) + + if similarity <= self.SIMILARITY_THRESHOLD: + self.verbose( + f"POTENTIAL MATCH: {probe_host} vs canary - similarity: {similarity:.3f} (threshold: {self.SIMILARITY_THRESHOLD}), probe status: {probe_status}, canary status: {canary_status}" + ) + + return similarity + + async def _verify_canary_keyword(self, original_response, probe_url, is_https, basehost, host_ip): + """Perform last-minute check on the canary for keyword-based virtual host wildcards""" + + try: + keyword_canary_response = await self._get_canary_response( + probe_url, basehost, host_ip, is_https, mode="random_append" + ) + except Exception as e: + self.warning(f"Canary verification failed due to curl error: {e}") + return False + + if not keyword_canary_response: + return False + + # If we get the exact same content after altering the hostname, keyword based virtual host routing is likely being used + if (keyword_canary_response.text or "") == (original_response.text or ""): + self.verbose( + f"Intentionally wrong hostname has a canary too similar to the original. Using probe url: {probe_url} - response data is exactly the same" + ) + return False + + original_simhash = await self.helpers.run_in_executor_mp(compute_simhash, original_response.text or "") + keyword_simhash = await self.helpers.run_in_executor_mp(compute_simhash, keyword_canary_response.text or "") + similarity = self.helpers.simhash.similarity(original_simhash, keyword_simhash) + + if similarity >= self.SIMILARITY_THRESHOLD: + self.verbose( + f"Intentionally wrong hostname has a canary too similar to the original. Using probe url: {probe_url} - similarity: {similarity:.3f} above threshold {self.SIMILARITY_THRESHOLD} - Original: {original_response.status_code} ({len(original_response.text or '')} bytes), Current: {keyword_canary_response.status_code} ({len(keyword_canary_response.text or '')} bytes)" + ) + return False + return True + + async def _verify_canary_consistency( + self, original_canary_response, canary_mode, normalized_url, is_https, basehost, host_ip + ): + """Perform last-minute check on the canary for consistency""" + + # Re-run the same canary test as we did initially + try: + consistency_canary_response = await self._get_canary_response( + normalized_url, basehost, host_ip, is_https, mode=canary_mode + ) + except Exception as e: + self.warning(f"Canary verification failed due to curl error: {e}") + return False + + if not consistency_canary_response: + return False + + # Check if HTTP codes are different first (hard failure) + if original_canary_response.status_code != consistency_canary_response.status_code: + self.verbose( + f"CANARY HTTP CODE CHANGED for {normalized_url} - Original: {original_canary_response.status_code} ({len(original_canary_response.text or '')} bytes), Current: {consistency_canary_response.status_code} ({len(consistency_canary_response.text or '')} bytes)" + ) + return False + + # if response data is exactly the same, we're good + if (original_canary_response.text or "") == (consistency_canary_response.text or ""): + return True + + # Fallback - use similarity comparison for response data (allows slight differences) + original_simhash = await self.helpers.run_in_executor_mp(compute_simhash, original_canary_response.text or "") + consistency_simhash = await self.helpers.run_in_executor_mp( + compute_simhash, consistency_canary_response.text or "" + ) + similarity = self.helpers.simhash.similarity(original_simhash, consistency_simhash) + if similarity < self.SIMILARITY_THRESHOLD: + self.verbose( + f"CANARY SIMILARITY CHANGED for {normalized_url} - similarity: {similarity:.3f} below threshold {self.SIMILARITY_THRESHOLD} - Original: {original_canary_response.status_code} ({len(original_canary_response.text or '')} bytes), Current: {consistency_canary_response.status_code} ({len(consistency_canary_response.text or '')} bytes)" + ) + return False + return True + + def _extract_title(self, response_data): + """Extract title from HTML response""" + soup = self.helpers.beautifulsoup(response_data, "html.parser") + if soup and soup.title and soup.title.string: + return soup.title.string.strip() + return None + + def _build_description(self, discovery_string, probe_response, is_externally_accessible=None, host_ip=None): + """Build detailed description with discovery technique and content info""" + http_code = probe_response.status_code if probe_response else "N/A" + response_size = len(probe_response.text or "") if probe_response else 0 + + description = f"Discovery Technique: [{discovery_string}], Discovered Content: [Status Code: {http_code}]" + + # Add title if available + title = self._extract_title(probe_response.text or "" if probe_response else "") + if title: + description += f" [Title: {title}]" + description += f" [Size: {response_size} bytes]" + + # Add IP address if available + if host_ip: + description += f" [IP: {host_ip}]" + + # Add accessibility information if available + if is_externally_accessible is not None: + accessibility_status = "externally accessible" if is_externally_accessible else "not externally accessible" + description += f" [Access: {accessibility_status}]" + + return description + + def mutations_check(self, virtualhost): + mutations_list = [] + for mutation in self.helpers.word_cloud.mutations(virtualhost, cloud=False): + mutations_list.extend(["".join(mutation), "-".join(mutation)]) + mutations_list_file = self.helpers.tempfile(mutations_list, pipe=False) + return mutations_list_file + + async def finish(self): + # phase 5: check existing hosts with wordcloud + self.verbose(" === Starting Finish() Wordcloud check === ") + if not self.config.get("wordcloud_check", False): + self.debug("FINISH METHOD: Wordcloud check is disabled, skipping finish phase") + return + + if not self.helpers.word_cloud.keys(): + self.verbose("FINISH METHOD: No wordcloud data available for finish phase") + return + + # Filter wordcloud words: no dots, reasonable length limit + all_wordcloud_words = list(self.helpers.word_cloud.keys()) + filtered_words = [] + for word in all_wordcloud_words: + # Filter out words with dots (likely full domains) + if "." in word: + continue + # Filter out very long words (likely noise) + if len(word) > 15: + continue + # Filter out very short words (likely noise) + if len(word) < 2: + continue + filtered_words.append(word) + + tempfile = self.helpers.tempfile(filtered_words, pipe=False) + self.debug( + f"FINISH METHOD: Starting wordcloud check on {len(self.scanned_hosts)} hosts using {len(filtered_words)} filtered words from wordcloud" + ) + + for host, event in self.scanned_hosts.items(): + if host not in self.wordcloud_tried_hosts: + host_parsed_url = urlparse(host) + + if self.config.get("force_basehost"): + basehost = self.config.get("force_basehost") + else: + basehost, subdomain = self._get_basehost(event) + + # Get fresh canary and original response for this host + is_https = host_parsed_url.scheme == "https" + + if not event.resolved_hosts: + self.debug(f"FINISH METHOD: No resolved hosts for {host}, skipping wordcloud check") + continue + + host_ip = str(next(iter(event.resolved_hosts))) + + self.verbose(f"FINISH METHOD: Starting wildcard check for {host}") + baseline_response = await self._get_baseline_response(event, host, host_ip) + if not await self._wildcard_canary_check( + host_parsed_url.scheme, host_parsed_url.netloc, event, host_ip, baseline_response + ): + self.debug( + f"WILDCARD CHECK FAILED in finish: Skipping {host} in wordcloud phase - failed virtual host wildcard check" + ) + self.wordcloud_tried_hosts.add(host) # Mark as tried to avoid retrying + continue + else: + self.debug(f"WILDCARD CHECK PASSED in finish: Proceeding with wordcloud mutations for {host}") + + await self._run_virtualhost_phase( + "Target host wordcloud mutations", + host, + basehost, + host_ip, + is_https, + event, + "subdomain", + wordlist=tempfile, + ) + self.wordcloud_tried_hosts.add(host) + + async def filter_event(self, event): + if ( + "cdn-cloudflare" in event.tags + or "cdn-imperva" in event.tags + or "cdn-akamai" in event.tags + or "cdn-cloudfront" in event.tags + ): + self.debug(f"Not processing URL {event.data} because it's behind a WAF or CDN, and that's pointless") + return False + return True diff --git a/bbot/test/test_step_2/module_tests/test_module_virtualhost.py b/bbot/test/test_step_2/module_tests/test_module_virtualhost.py new file mode 100644 index 0000000000..832587fae1 --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_virtualhost.py @@ -0,0 +1,892 @@ +from .base import ModuleTestBase, tempwordlist +import re +from werkzeug.wrappers import Response + + +class VirtualhostTestBase(ModuleTestBase): + """Base class for virtualhost tests with common setup""" + + async def setup_before_prep(self, module_test): + # Fix randomness for predictable canary generation + module_test.monkeypatch.setattr("random.seed", lambda x: None) + import string + + def predictable_choice(seq): + return seq[0] if seq == string.ascii_lowercase else seq[0] + + module_test.monkeypatch.setattr("random.choice", predictable_choice) + + async def setup_after_prep(self, module_test): + expect_args = re.compile("/") + module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) + + +class TestVirtualhostSpecialHosts(VirtualhostTestBase): + """Test special hosts detection""" + + targets = ["http://localhost:8888"] + modules_overrides = ["http", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": False, # Focus on special hosts only + "mutation_check": False, # Focus on special hosts only + "special_hosts": True, # Enable special hosts + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Keep request handler-based HTTP server + await super().setup_after_prep(module_test) + + # Emit URL event manually and ensure resolved_hosts + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_special" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://localhost:8888/", + "URL", + parent=event, + tags=["status-200", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_special"] = DummyModule(module_test.scan) + + # Patch virtualhost to inject resolved_hosts + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline request to localhost (with or without port) + if not host_header or host_header in ["localhost", "localhost:8888"]: + return Response("baseline response from localhost", status=200) + + # Wildcard canary check + if re.match(r"[a-z]ocalhost(?::8888)?$", host_header): + return Response("different wildcard response", status=404) + + # Random canary requests (12 lowercase letters .com) + if re.match(r"^[a-z]{12}\.com(?::8888)?$", host_header): + return Response( + """ +
Random canary host.
""", + status=404, + ) + + # Special hosts responses - return different content than canary + if host_header == "host.docker.internal": + return Response("Docker internal host active", status=200) + if host_header == "127.0.0.1": + return Response("Loopback host active", status=200) + if host_header == "localhost": + return Response("Localhost virtual host active", status=200) + + # Default for any other requests - match canary content to avoid false positives + return Response( + """ +Random canary host.
""", + status=404, + ) + + def check(self, module_test, events): + special_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["host.docker.internal", "127.0.0.1", "localhost"]: + special_hosts_found.add(vhost) + + # Test description elements to ensure they are as expected + description = e.data["description"] + assert ( + "Discovery Technique: [Special virtual host list" in description + or "Discovery Technique: [Mutations on discovered" in description + ), f"Description missing or unexpected discovery technique: {description}" + assert "Status Code:" in description, f"Description missing status code: {description}" + assert "Size:" in description and "bytes" in description, ( + f"Description missing size: {description}" + ) + assert "IP: 127.0.0.1" in description, f"Description missing IP: {description}" + assert "Access:" in description, f"Description missing access status: {description}" + + assert len(special_hosts_found) >= 1, f"Failed to detect special virtual hosts. Found: {special_hosts_found}" + + +class TestVirtualhostBruteForce(VirtualhostTestBase): + """Test subdomain brute-force detection using HTTP Host headers""" + + targets = ["http://test.example:8888"] + modules_overrides = ["virtualhost"] # Remove http, we'll manually create URL events + test_wordlist = ["admin", "api", "test"] + config_overrides = { + "modules": { + "virtualhost": { + "brute_wordlist": tempwordlist(test_wordlist), + "subdomain_brute": True, # Enable brute force + "mutation_check": False, # Focus on brute force only + "special_hosts": False, # Focus on brute force only + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Call parent setup_after_prep to set up the HTTP server with request_handler + await super().setup_after_prep(module_test) + + # Set up DNS mocking for test.example to resolve to 127.0.0.1 + await module_test.mock_dns({"test.example": {"A": ["127.0.0.1"]}}) + + # Create a dummy module that will emit the URL event during the scan + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + # Create and emit URL event for virtualhost module to process + url_event = self.scan.make_event( + "http://test.example:8888/", "URL", parent=event, tags=["status-200", "ip-127.0.0.1"] + ) + await self.emit_event(url_event) + + # Add the dummy module to the scan + dummy_module = DummyModule(module_test.scan) + module_test.scan.modules["dummy_module"] = dummy_module + + # Patch virtualhost to inject resolved_hosts for URL events during the test + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + from werkzeug.wrappers import Response + + host_header = request.headers.get("Host", "").lower() + + # Baseline request to test.example or example (with or without port) + if not host_header or host_header in ["test.example", "test.example:8888", "example", "example:8888"]: + return Response("baseline response from example baseline", status=200) + + # Wildcard canary check - change one character in test.example + if re.match(r"[a-z]est\.example", host_header): + return Response("wildcard canary different response", status=404) + + # Brute-force canary requests - random string + .test.example (with optional port) + if re.match(r"^[a-z]{12}\.test\.example(?::8888)?$", host_header): + return Response("subdomain canary response", status=404) + + # Brute-force matches on discovered basehost (admin|api|test).test.example (with optional port) + if host_header in ["admin.test.example", "admin.test.example:8888"]: + return Response("Admin panel found here!", status=200) + if host_header in ["api.test.example", "api.test.example:8888"]: + return Response("API endpoint found here!", status=200) + if host_header in ["test.test.example", "test.test.example:8888"]: + return Response("Test environment found here!", status=200) + + # Default response + return Response("default response", status=404) + + def check(self, module_test, events): + brute_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["admin.test.example", "api.test.example", "test.test.example"]: + brute_hosts_found.add(vhost) + + assert len(brute_hosts_found) >= 1, f"Failed to detect brute-force virtual hosts. Found: {brute_hosts_found}" + + +class TestVirtualhostMutations(VirtualhostTestBase): + """Test host mutation detection using HTTP Host headers""" + + targets = ["http://subdomain.target.test:8888"] + modules_overrides = ["http", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": False, # Focus on mutations only + "mutation_check": True, # Enable mutations + "special_hosts": False, # Focus on mutations only + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + async def setup_before_prep(self, module_test): + # Call parent setup first + await super().setup_before_prep(module_test) + + # Mock wordcloud.mutations to return predictable results for "target" + def mock_mutations(self, word, **kwargs): + # Return realistic mutations that would be found for "target" + return [ + [word, "dev"], # targetdev, target-dev + ["dev", word], # devtarget, dev-target + [word, "test"], # targettest, target-test + ] + + module_test.monkeypatch.setattr("bbot.core.helpers.wordcloud.WordCloud.mutations", mock_mutations) + + async def setup_after_prep(self, module_test): + # Keep request handler-based HTTP server + await super().setup_after_prep(module_test) + + # Set up DNS mocking for target.test + await module_test.mock_dns({"target.test": {"A": ["127.0.0.1"]}}) + + # Emit URL event manually and ensure resolved_hosts + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_mut" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://subdomain.target.test:8888/", + "URL", + parent=event, + tags=["status-200", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_mut"] = DummyModule(module_test.scan) + + # Patch virtualhost to inject resolved hosts + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline request to target.test (with or without port) + if not host_header or host_header in ["subdomain.target.test", "subdomain.target.test:8888"]: + return Response("baseline response from target.test", status=200) + + # Wildcard canary check + if re.match(r"[a-z]subdomain\.target\.test(?::8888)?$", host_header): # Modified target.test + return Response("wildcard canary response", status=404) + + # Mutation canary requests (4 chars + dash + original host) + if re.match(r"^[a-z]{4}-subdomain\.target\.test(?::8888)?$", host_header): + return Response("Mutation Canary", status=404) + + # Word cloud mutation matches - return different content than canary + if host_header == "subdomain-dev.target.test": + return Response("Dev target 1 found!", status=200) + if host_header == "devsubdomain.target.test": + return Response("Dev target 2 found!", status=200) + if host_header == "subdomaintest.target.test": + return Response("Test target found!", status=200) + + # Default response + return Response( + """\nDefault handler response.
""", + status=404, + ) + + def check(self, module_test, events): + mutation_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + # Look for mutation patterns with dev/test + if any(word in vhost for word in ["dev", "test"]) and "target" in vhost: + mutation_hosts_found.add(vhost) + + assert len(mutation_hosts_found) >= 1, ( + f"Failed to detect mutation virtual hosts. Found: {mutation_hosts_found}" + ) + + +class TestVirtualhostWordcloud(VirtualhostTestBase): + """Test finish() wordcloud-based detection using HTTP Host headers""" + + targets = ["http://wordcloud.test:8888"] + modules_overrides = ["http", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": False, # Focus on wordcloud only + "mutation_check": False, # Focus on wordcloud only + "special_hosts": False, # Focus on wordcloud only + "certificate_sans": False, + "wordcloud_check": True, # Enable wordcloud + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Keep request handler-based HTTP server + await super().setup_after_prep(module_test) + + # Set up DNS mocking for wordcloud.test + await module_test.mock_dns({"wordcloud.test": {"A": ["127.0.0.1"]}}) + + # Mock wordcloud to have some common words + def mock_wordcloud_keys(self): + return ["staging", "prod", "dev", "admin", "api"] + + module_test.monkeypatch.setattr("bbot.core.helpers.wordcloud.WordCloud.keys", mock_wordcloud_keys) + + # Emit URL event manually and ensure resolved_hosts + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_wc" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://wordcloud.test:8888/", + "URL", + parent=event, + tags=["status-200", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_wc"] = DummyModule(module_test.scan) + + # Patch virtualhost to inject resolved hosts + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline request to wordcloud.test (with or without port) + if not host_header or host_header in ["wordcloud.test", "wordcloud.test:8888"]: + return Response("baseline response from wordcloud.test", status=200) + + # Wildcard canary check + if re.match(r"[a-z]ordcloud\.test(?::8888)?$", host_header): # Modified wordcloud.test + return Response("wildcard canary response", status=404) + + # Random canary requests (12 chars + .com) + if re.match(r"^[a-z]{12}\.com(?::8888)?$", host_header): + return Response("random canary response", status=404) + + # Wordcloud-based matches - these are checked in finish() + if host_header in ["staging.wordcloud.test", "staging.wordcloud.test:8888"]: + return Response("Staging environment found!", status=200) + if host_header in ["prod.wordcloud.test", "prod.wordcloud.test:8888"]: + return Response("Production environment found!", status=200) + if host_header in ["dev.wordcloud.test", "dev.wordcloud.test:8888"]: + return Response("Development environment found!", status=200) + + # Default response + return Response("default response", status=404) + + def check(self, module_test, events): + wordcloud_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["staging.wordcloud.test", "prod.wordcloud.test", "dev.wordcloud.test"]: + wordcloud_hosts_found.add(vhost) + + assert len(wordcloud_hosts_found) >= 1, ( + f"Failed to detect wordcloud virtual hosts. Found: {wordcloud_hosts_found}" + ) + + +class TestVirtualhostHTTPSLogic(ModuleTestBase): + """Unit tests for HTTPS/SNI-specific functions""" + + targets = ["http://localhost:8888"] # Minimal target for unit testing + modules_overrides = ["http", "virtualhost"] + + async def setup_before_prep(self, module_test): + pass # No special setup needed + + async def setup_after_prep(self, module_test): + pass # No HTTP mocking needed for unit tests + + def check(self, module_test, events): + # Get the virtualhost module instance for direct testing + virtualhost_module = None + for module in module_test.scan.modules.values(): + if hasattr(module, "special_virtualhost_list"): + virtualhost_module = module + break + + assert virtualhost_module is not None, "Could not find virtualhost module instance" + + # Test canary host generation for different modes + canary_subdomain = virtualhost_module._get_canary_random_host("test.example.com", ".example.com", "subdomain") + canary_mutation = virtualhost_module._get_canary_random_host("test.example.com", ".example.com", "mutation") + canary_random = virtualhost_module._get_canary_random_host("test.example.com", ".example.com", "random") + + # Verify canary patterns + assert canary_subdomain.endswith(".example.com"), ( + f"Subdomain canary doesn't end with basehost: {canary_subdomain}" + ) + assert "-test.example.com" in canary_mutation, ( + f"Mutation canary doesn't contain expected pattern: {canary_mutation}" + ) + assert canary_random.endswith(".com"), f"Random canary doesn't end with .com: {canary_random}" + + # Test that all canaries are different + assert canary_subdomain != canary_mutation != canary_random, "Canaries should be different" + + +class TestVirtualhostForceBasehost(VirtualhostTestBase): + """Test force_basehost functionality specifically""" + + targets = ["http://127.0.0.1:8888"] # Use IP to require force_basehost + modules_overrides = ["http", "virtualhost"] + test_wordlist = ["admin", "api"] + config_overrides = { + "modules": { + "virtualhost": { + "brute_wordlist": tempwordlist(test_wordlist), + "force_basehost": "forced.domain", # Test force_basehost functionality + "subdomain_brute": True, + "mutation_check": False, + "special_hosts": False, + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline request to the IP + if not host_header or host_header == "127.0.0.1:8888": + return Response("baseline response from IP", status=200) + + # Wildcard canary check + if re.match(r"[0-9]27\.0\.0\.1:8888", host_header): + return Response("wildcard canary response", status=404) + + # Subdomain canary (12 random chars + .forced.domain) + if re.match(r"[a-z]{12}\.forced\.domain", host_header): + return Response("forced domain canary response", status=404) + + # Virtual hosts using forced basehost + if host_header == "admin.forced.domain": + return Response("Admin with forced basehost found!", status=200) + if host_header == "api.forced.domain": + return Response("API with forced basehost found!", status=200) + + # Default response + return Response("default response", status=404) + + def check(self, module_test, events): + forced_hosts_found = set() + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["admin.forced.domain", "api.forced.domain"]: + forced_hosts_found.add(vhost) + + # Verify the description shows it used the forced basehost + description = e.data["description"] + assert "Subdomain Brute-force" in description, ( + f"Expected subdomain brute-force discovery: {description}" + ) + + assert len(forced_hosts_found) >= 1, ( + f"Failed to detect virtual hosts with force_basehost. Found: {forced_hosts_found}. " + f"Expected at least one of: admin.forced.domain, api.forced.domain" + ) + + +class TestVirtualhostInterestingDefaultContent(VirtualhostTestBase): + """Test reporting of interesting default canary content during wildcard check""" + + targets = ["http://interesting.test:8888"] + modules_overrides = ["http", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": False, + "mutation_check": False, + "special_hosts": False, + "certificate_sans": False, + "wordcloud_check": False, + "report_interesting_default_content": True, + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Start HTTP server + await super().setup_after_prep(module_test) + + # Mock DNS resolution for interesting.test + await module_test.mock_dns({"interesting.test": {"A": ["127.0.0.1"]}}) + + # Dummy module to emit the URL event for the virtualhost module + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_interesting" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://interesting.test:8888/", + "URL", + parent=event, + tags=["status-404", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_interesting"] = DummyModule(module_test.scan) + + # Patch virtualhost to inject resolved hosts + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline response for original host (ensure status differs from canary) + if not host_header or host_header in ["interesting.test", "interesting.test:8888"]: + return Response("baseline not found", status=404) + + # Wildcard canary mutated hostname: change first alpha to 'z' -> znteresting.test + if host_header in ["znteresting.test", "znteresting.test:8888"]: + long_body = ( + "This is a sufficiently long default page body that exceeds forty characters " + "to trigger the interesting default content branch." + ) + return Response(long_body, status=200) + + # Default + return Response("default response", status=404) + + def check(self, module_test, events): + found_interesting = False + found_correct_host = False + for e in events: + if e.type == "VIRTUAL_HOST": + desc = e.data.get("description", "") + if "Interesting Default Content (from intentionally-incorrect canary host)" in desc: + found_interesting = True + # The VIRTUAL_HOST should be the canary hostname used in the wildcard request + if e.data.get("virtual_host") == "znteresting.test": + found_correct_host = True + break + + assert found_interesting, "Expected VIRTUAL_HOST from interesting default canary content was not emitted" + assert found_correct_host, "virtual_host should equal the canary hostname 'znteresting.test'" + + +class TestVirtualhostKeywordWildcard(VirtualhostTestBase): + """Test keyword-based wildcard detection using 'www' in hostname""" + + targets = ["http://acme.test:8888"] + modules_overrides = ["http", "virtualhost"] + config_overrides = { + "modules": { + "virtualhost": { + "subdomain_brute": True, + "mutation_check": False, + "special_hosts": False, + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + # Keep brute_lines small and supply a tiny wordlist containing a 'www' entry and an exact match + } + } + } + + async def setup_after_prep(self, module_test): + # Start HTTP server with wildcard behavior for any hostname containing 'www' + await super().setup_after_prep(module_test) + + # Mock DNS resolution for acme.test + await module_test.mock_dns({"acme.test": {"A": ["127.0.0.1"]}}) + + # Provide a tiny custom wordlist containing 'wwwfoo' and 'admin' so that: + # - 'wwwfoo' would be a false positive without the keyword-based wildcard detection + # - 'admin' will be an exact match we deliberately allow via the response handler + from .base import tempwordlist + + words = ["wwwfoo", "admin"] + wl = tempwordlist(words) + + # Patch virtualhost to use our custom wordlist and inject resolved hosts + vh_module = module_test.scan.modules["virtualhost"] + original_setup = vh_module.setup + + async def patched_setup(): + await original_setup() + vh_module.brute_wordlist = wl + return True + + module_test.monkeypatch.setattr(vh_module, "setup", patched_setup) + + # Emit URL event manually and ensure resolved_hosts + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_keyword" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + url_event = self.scan.make_event( + "http://acme.test:8888/", + "URL", + parent=event, + tags=["status-404", "ip-127.0.0.1"], + ) + await self.emit_event(url_event) + + module_test.scan.modules["dummy_module_keyword"] = DummyModule(module_test.scan) + + # Inject resolved hosts for the URL + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + host_header = request.headers.get("Host", "").lower() + + # Baseline response for original host + if not host_header or host_header in ["acme.test", "acme.test:8888"]: + return Response("baseline not found", status=404) + + # If hostname contains 'www' anywhere, return the same body as baseline (simulating keyword wildcard) + if "www" in host_header: + return Response("baseline not found", status=404) + + # Exact-match virtual host that should still be detected + if host_header in ["admin.acme.test", "admin.acme.test:8888"]: + return Response("Admin portal", status=200) + + # Default + return Response("default response", status=404) + + def check(self, module_test, events): + found_admin = False + found_www = False + for e in events: + if e.type == "VIRTUAL_HOST": + vhost = e.data.get("virtual_host") + if vhost == "admin.acme.test": + found_admin = True + if vhost and "www" in vhost: + found_www = True + + assert found_admin, "Expected VIRTUAL_HOST for admin.acme.test was not emitted" + assert not found_www, "No VIRTUAL_HOST should be emitted for 'www' keyword wildcard entries" + + +class TestVirtualhostHTTPResponse(VirtualhostTestBase): + """Test virtual host discovery with badsecrets analysis of HTTP_RESPONSE events""" + + targets = ["http://secrets.test:8888"] + modules_overrides = ["virtualhost", "badsecrets"] + test_wordlist = ["admin"] + config_overrides = { + "modules": { + "virtualhost": { + "brute_wordlist": tempwordlist(test_wordlist), + "subdomain_brute": True, + "mutation_check": False, + "special_hosts": False, + "certificate_sans": False, + "wordcloud_check": False, + "require_inaccessible": False, + } + } + } + + async def setup_after_prep(self, module_test): + # Call parent setup_after_prep to set up the HTTP server with request_handler + await super().setup_after_prep(module_test) + + # Set up DNS mocking for secrets.test to resolve to 127.0.0.1 + await module_test.mock_dns({"secrets.test": {"A": ["127.0.0.1"]}}) + + # Create a dummy module that will emit the URL event during the scan + from bbot.modules.base import BaseModule + + class DummyModule(BaseModule): + _name = "dummy_module_secrets" + watched_events = ["SCAN"] + + async def handle_event(self, event): + if event.type == "SCAN": + # Create and emit URL event for virtualhost module to process + url_event = self.scan.make_event( + "http://secrets.test:8888/", "URL", parent=event, tags=["status-200", "ip-127.0.0.1"] + ) + await self.emit_event(url_event) + + # Add the dummy module to the scan + dummy_module = DummyModule(module_test.scan) + module_test.scan.modules["dummy_module_secrets"] = dummy_module + + # Patch virtualhost to inject resolved_hosts for URL events during the test + vh_module = module_test.scan.modules["virtualhost"] + orig_handle_event = vh_module.handle_event + + async def patched_handle_event(ev): + ev._resolved_hosts = {"127.0.0.1"} + return await orig_handle_event(ev) + + module_test.monkeypatch.setattr(vh_module, "handle_event", patched_handle_event) + + def request_handler(self, request): + from werkzeug.wrappers import Response + + host_header = request.headers.get("Host", "").lower() + + # Baseline request to secrets.test (with or without port) + if not host_header or host_header in ["secrets.test", "secrets.test:8888"]: + return Response("baseline response from secrets.test", status=200) + + # Wildcard canary check - change one character in secrets.test + if re.match(r"[a-z]ecrets\.test", host_header): + return Response("wildcard canary different response", status=404) + + # Brute-force canary requests - random string + .secrets.test (with optional port) + if re.match(r"^[a-z]{12}\.secrets\.test(?::8888)?$", host_header): + return Response("subdomain canary response", status=404) + + # Virtual host with vulnerable JWT cookie and JWT in body - both using weak secret '1234' - this should trigger badsecrets twice + if host_header in ["admin.secrets.test", "admin.secrets.test:8888"]: + return Response( + "Admin Panel
", + status=200, + headers={ + "set-cookie": "vulnjwt=eyJhbGciOiJIUzI1NiJ9.eyJJc3N1ZXIiOiJJc3N1ZXIiLCJVc2VybmFtZSI6IkJhZFNlY3JldHMiLCJleHAiOjE1OTMxMzM0ODMsImlhdCI6MTQ2NjkwMzA4M30.ovqRikAo_0kKJ0GVrAwQlezymxrLGjcEiW_s3UJMMCo; secure" + }, + ) + + # Default response + return Response("default response", status=404) + + def check(self, module_test, events): + virtual_host_found = False + http_response_found = False + jwt_cookie_vuln_found = False + jwt_body_vuln_found = False + + # Debug: print all events to see what we're getting + print(f"\n=== DEBUG: Found {len(events)} events ===") + for e in events: + print(f"Event: {e.type} - {e.data}") + if hasattr(e, "tags"): + print(f" Tags: {e.tags}") + + for e in events: + # Check for virtual host discovery + if e.type == "VIRTUAL_HOST": + vhost = e.data["virtual_host"] + if vhost in ["admin.secrets.test"]: + virtual_host_found = True + # Verify it has the virtual-host tag + assert "virtual-host" in e.tags, f"VIRTUAL_HOST event missing virtual-host tag: {e.tags}" + + # Check for HTTP_RESPONSE with virtual-host tag + elif e.type == "HTTP_RESPONSE": + if "virtual-host" in e.tags: + http_response_found = True + # Verify the HTTP_RESPONSE has the expected format + assert "input" in e.data, f"HTTP_RESPONSE missing input field: {e.data}" + assert e.data["input"] == "admin.secrets.test", f"HTTP_RESPONSE input mismatch: {e.data['input']}" + assert "status_code" in e.data, f"HTTP_RESPONSE missing status_code: {e.data}" + assert e.data["status_code"] == 200, f"HTTP_RESPONSE status_code mismatch: {e.data['status_code']}" + # Debug: print the response data to see what badsecrets is analyzing + print(f"HTTP_RESPONSE data: {e.data}") + + # Check for badsecrets findings + elif e.type == "FINDING": + print(f"Found FINDING event: {e.data}") + description = e.data["description"] + + # Check for JWT vulnerability (from cookie) + if ( + "1234" in description + and "eyJhbGciOiJIUzI1NiJ9.eyJJc3N1ZXIiOiJJc3N1ZXIiLCJVc2VybmFtZSI6IkJhZFNlY3JldHMiLCJleHAiOjE1OTMxMzM0ODMsImlhdCI6MTQ2NjkwMzA4M30.ovqRikAo_0kKJ0GVrAwQlezymxrLGjcEiW_s3UJMMCo" + in description + and "JWT" in description + ): + jwt_cookie_vuln_found = True + + # Check for JWT vulnerability (from body) + if ( + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxMjMsInVzZXJuYW1lIjoiYWRtaW4iLCJleHAiOjE1OTMxMzM0ODMsImlhdCI6MTQ2NjkwMzA4M30.03xPSXavrMk0HK4BD3_hPKgu3RLu6CmTSPGfrDx2qpg" + in description + and "JWT" in description + ): + jwt_body_vuln_found = True + + assert virtual_host_found, "Failed to detect virtual host admin.secrets.test" + assert http_response_found, "Failed to detect HTTP_RESPONSE event with virtual-host tag" + assert jwt_cookie_vuln_found, ( + "Failed to detect JWT vulnerability - JWT with weak secret '1234' should have been found" + ) + assert jwt_body_vuln_found, ( + "Failed to detect JWT vulnerability in body - JWT 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoxMjMsInVzZXJuYW1lIjoiYWRtaW4iLCJleHAiOjE1OTMxMzM0ODMsImlhdCI6MTQ2NjkwMzA4M30.03xPSXavrMk0HK4BD3_hPKgu3RLu6CmTSPGfrDx2qpg' should have been found" + ) + print( + f"Test results: virtual_host_found={virtual_host_found}, http_response_found={http_response_found}, jwt_cookie_vuln_found={jwt_cookie_vuln_found}, jwt_body_vuln_found={jwt_body_vuln_found}" + )