Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions src/crawlee/_request_throttler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Per-domain rate limit tracker for handling HTTP 429 responses.
# See: https://github.com/apify/crawlee-python/issues/1437

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from logging import getLogger
from urllib.parse import urlparse

from crawlee._utils.docs import docs_group

logger = getLogger(__name__)


@dataclass
class _DomainState:
"""Tracks rate limit state for a single domain."""

domain: str
"""The domain being tracked."""

next_allowed_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
"""Earliest time the next request to this domain is allowed."""

consecutive_429_count: int = 0
"""Number of consecutive 429 responses (for exponential backoff)."""


@docs_group('Crawlers')
class RequestThrottler:
"""Per-domain rate limit tracker and request throttler.

When a target website returns HTTP 429 (Too Many Requests), this component
tracks the rate limit event per domain and applies exponential backoff.
Requests to other (non-rate-limited) domains are unaffected.

This solves the "death spiral" problem where 429 responses reduce CPU usage,
causing the `AutoscaledPool` to incorrectly scale UP concurrency.
"""

_BASE_DELAY = timedelta(seconds=2)
"""Initial delay after the first 429 response from a domain."""

_MAX_DELAY = timedelta(seconds=60)
"""Maximum delay between requests to a rate-limited domain."""

def __init__(self) -> None:
self._domain_states: dict[str, _DomainState] = {}

@staticmethod
def _extract_domain(url: str) -> str:
"""Extract the domain (hostname) from a URL.

Args:
url: The URL to extract the domain from.

Returns:
The hostname portion of the URL, or an empty string if parsing fails.
"""
parsed = urlparse(url)
return parsed.hostname or ''

def record_rate_limit(self, url: str, *, retry_after: timedelta | None = None) -> None:
"""Record a 429 Too Many Requests response for the domain of the given URL.

Increments the consecutive 429 count and calculates the next allowed
request time using exponential backoff or the Retry-After value.

Args:
url: The URL that received a 429 response.
retry_after: Optional delay from the Retry-After header. If provided,
it takes priority over the calculated exponential backoff.
"""
domain = self._extract_domain(url)
if not domain:
return

now = datetime.now(timezone.utc)

if domain not in self._domain_states:
self._domain_states[domain] = _DomainState(domain=domain)

state = self._domain_states[domain]
state.consecutive_429_count += 1

# Calculate delay: use Retry-After if provided, otherwise exponential backoff.
if retry_after is not None:
delay = retry_after
else:
delay = self._BASE_DELAY * (2 ** (state.consecutive_429_count - 1))

# Cap the delay at _MAX_DELAY.
if delay > self._MAX_DELAY:
delay = self._MAX_DELAY

state.next_allowed_at = now + delay

logger.info(
f'Rate limit (429) detected for domain "{domain}" '
f'(consecutive: {state.consecutive_429_count}, delay: {delay.total_seconds():.1f}s)'
)

def is_throttled(self, url: str) -> bool:
"""Check if requests to the domain of the given URL should be delayed.

Args:
url: The URL to check.

Returns:
True if the domain is currently rate-limited and the cooldown has not expired.
"""
domain = self._extract_domain(url)
state = self._domain_states.get(domain)

if state is None:
return False

return datetime.now(timezone.utc) < state.next_allowed_at

def get_delay(self, url: str) -> timedelta:
"""Get the remaining delay before the next request to this domain is allowed.

Args:
url: The URL to check.

Returns:
The remaining time to wait. Returns zero if no delay is needed.
"""
domain = self._extract_domain(url)
state = self._domain_states.get(domain)

if state is None:
return timedelta(0)

remaining = state.next_allowed_at - datetime.now(timezone.utc)
return max(remaining, timedelta(0))

def record_success(self, url: str) -> None:
"""Record a successful request to the domain, resetting its backoff state.

Args:
url: The URL that received a successful response.
"""
domain = self._extract_domain(url)
state = self._domain_states.get(domain)

if state is not None and state.consecutive_429_count > 0:
logger.debug(f'Resetting rate limit state for domain "{domain}" after successful request')
state.consecutive_429_count = 0
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,12 @@ async def _handle_status_code_response(
"""
status_code = context.http_response.status_code
if self._retry_on_blocked:
self._raise_for_session_blocked_status_code(context.session, status_code)
self._raise_for_session_blocked_status_code(
context.session,
status_code,
url=context.request.url,
retry_after_header=context.http_response.headers.get('retry-after'),
)
self._raise_for_error_status_code(status_code)
yield context

Expand Down
70 changes: 68 additions & 2 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from asyncio import CancelledError
from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable, Sequence
from contextlib import AsyncExitStack, suppress
from datetime import timedelta
from datetime import datetime, timedelta, timezone
from functools import partial
from io import StringIO
from pathlib import Path
Expand Down Expand Up @@ -46,6 +46,7 @@
from crawlee._utils.docs import docs_group
from crawlee._utils.file import atomic_write, export_csv_to_stream, export_json_to_stream
from crawlee._utils.recurring_task import RecurringTask
from crawlee._request_throttler import RequestThrottler
from crawlee._utils.robots import RobotsTxtFile
from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute
from crawlee._utils.wait import wait_for
Expand Down Expand Up @@ -485,6 +486,7 @@ async def persist_state_factory() -> KeyValueStore:
self._robots_txt_file_cache: LRUCache[str, RobotsTxtFile] = LRUCache(maxsize=1000)
self._robots_txt_lock = asyncio.Lock()
self._tld_extractor = TLDExtract(cache_dir=tempfile.TemporaryDirectory().name)
self._request_throttler = RequestThrottler()
self._snapshotter = Snapshotter.from_config(config)
self._autoscaled_pool = AutoscaledPool(
system_status=SystemStatus(self._snapshotter),
Expand Down Expand Up @@ -1396,6 +1398,15 @@ async def __run_task_function(self) -> None:
if request is None:
return

# Check if this domain is currently rate-limited (429 backoff).
if self._request_throttler.is_throttled(request.url):
self._logger.debug(
f'Request to {request.url} delayed - domain is rate-limited '
f'(retry in {self._request_throttler.get_delay(request.url).total_seconds():.1f}s)'
)
await request_manager.reclaim_request(request)
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If, at some point, the request queue contains only requests from a throttled domain, this will become a busy wait with extra steps. If you're using the Apify platform, this will cost a lot in request queue writes.

I'm afraid that this means we cannot accept the PR in the current state. See the main review comments for possible next steps.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully addressed in the refactor. The reclaim-based throttle block was removed. ThrottlingRequestManager.fetch_next_request() now handles scheduling and awaits asyncio.sleep() when all domains are throttled, eliminating busy-wait and extra queue writes.


if not (await self._is_allowed_based_on_robots_txt_file(request.url)):
self._logger.warning(
f'Skipping request {request.url} ({request.unique_key}) because it is disallowed based on robots.txt'
Expand Down Expand Up @@ -1442,6 +1453,9 @@ async def __run_task_function(self) -> None:

await self._mark_request_as_handled(request)

# Record successful request to reset rate limit backoff for this domain.
self._request_throttler.record_success(request.url)

if session and session.is_usable:
session.mark_good()

Expand Down Expand Up @@ -1542,22 +1556,74 @@ def _raise_for_error_status_code(self, status_code: int) -> None:
if is_status_code_server_error(status_code) and not is_ignored_status:
raise HttpStatusCodeError('Error status code returned', status_code)

def _raise_for_session_blocked_status_code(self, session: Session | None, status_code: int) -> None:
def _raise_for_session_blocked_status_code(
self,
session: Session | None,
status_code: int,
*,
url: str = '',
retry_after_header: str | None = None,
) -> None:
"""Raise an exception if the given status code indicates the session is blocked.

If the status code is 429 (Too Many Requests), the domain is recorded as
rate-limited in the `RequestThrottler` for per-domain backoff.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outdated name


Args:
session: The session used for the request. If None, no check is performed.
status_code: The HTTP status code to check.
url: The request URL, used for per-domain rate limit tracking.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the parameter should be renamed to request_url so that it's not ambiguous with a URL after redirects.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Renamed to request_url across BasicCrawler, AbstractHttpCrawler, and PlaywrightCrawler.

retry_after_header: The value of the Retry-After response header, if present.

Raises:
SessionError: If the status code indicates the session is blocked.
"""
if status_code == 429 and url:
retry_after = self._parse_retry_after_header(retry_after_header)
self._request_throttler.record_rate_limit(url, retry_after=retry_after)

if session is not None and session.is_blocked_status_code(
status_code=status_code,
ignore_http_error_status_codes=self._ignore_http_error_status_codes,
):
raise SessionError(f'Assuming the session is blocked based on HTTP status code {status_code}')

@staticmethod
def _parse_retry_after_header(value: str | None) -> timedelta | None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has no business being in BasicCrawler. Better put it in the _utils module.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to crawlee._utils.http.parse_retry_after_header in the refactor commit.

"""Parse the Retry-After HTTP header value.

The header can contain either a number of seconds or an HTTP-date.
See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After

Args:
value: The raw Retry-After header value.

Returns:
A timedelta representing the delay, or None if the header is missing or unparseable.
"""
if not value:
return None

# Try parsing as integer seconds first.
try:
seconds = int(value)
return timedelta(seconds=seconds)
except ValueError:
pass

# Try parsing as HTTP-date (e.g., "Wed, 21 Oct 2015 07:28:00 GMT").
from email.utils import parsedate_to_datetime

try:
retry_date = parsedate_to_datetime(value)
delay = retry_date - datetime.now(retry_date.tzinfo or timezone.utc)
if delay.total_seconds() > 0:
return delay
except (ValueError, TypeError):
pass

return None

def _check_request_collision(self, request: Request, session: Session | None) -> None:
"""Raise an exception if a request cannot access required resources.

Expand Down
8 changes: 7 additions & 1 deletion src/crawlee/crawlers/_playwright/_playwright_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,13 @@ async def _handle_status_code_response(
"""
status_code = context.response.status
if self._retry_on_blocked:
self._raise_for_session_blocked_status_code(context.session, status_code)
retry_after_header = context.response.headers.get('retry-after')
self._raise_for_session_blocked_status_code(
context.session,
status_code,
url=context.request.url,
retry_after_header=retry_after_header,
)
self._raise_for_error_status_code(status_code)
yield context

Expand Down
Loading