Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
697b6e6
feat: Add Redis-based workspace stream quota for WebRTC sessions
rafel-roboflow Feb 20, 2026
269d2c4
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Feb 20, 2026
b9cefda
added debug
rafel-roboflow Feb 20, 2026
e1af7f9
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Feb 20, 2026
b83f828
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Feb 20, 2026
fd0c661
more debug; pass envs to modal env
rafel-roboflow Feb 20, 2026
b7c9a62
feat: add detailed Redis logging for session tracking debugging"
rafel-roboflow Feb 23, 2026
3de18e7
Fix issue with RF-Detr model post-processing in TRT
PawelPeczek-Roboflow Feb 23, 2026
4362ca0
Merge remote-tracking branch 'origin/fix/patch-post-processing-in-rfd…
rafel-roboflow Feb 23, 2026
ee50855
clean up
rafel-roboflow Feb 23, 2026
37650bd
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Feb 23, 2026
e19d8ac
clean up
rafel-roboflow Feb 23, 2026
400f71e
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Feb 23, 2026
002e2a3
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Feb 23, 2026
aba5acd
set max conn webrtc to 10
rafel-roboflow Feb 24, 2026
71b3048
isorted
rafel-roboflow Feb 24, 2026
459bbd5
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Feb 27, 2026
35b5c62
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Feb 27, 2026
c7c9e91
fix bug heartbeat session without auth; fix bug allowing refresh non …
rafel-roboflow Feb 27, 2026
47fa8f2
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Feb 27, 2026
8a173d7
removed unnecessary workspace id from refresh webrtc api
rafel-roboflow Feb 27, 2026
91a1e06
calling hearbeat in watchdog just at the beginning insterad of waitin…
rafel-roboflow Feb 27, 2026
8d790fb
implemented deregister session to avoid keeping sessions forever
rafel-roboflow Feb 27, 2026
cd8ebc3
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Feb 27, 2026
49f7f15
fix wrong endpoint refactor
rafel-roboflow Feb 27, 2026
b23bd72
include always workspace id
rafel-roboflow Feb 27, 2026
3579f5d
set ttl on redis zadd to avoid redis growing without control
rafel-roboflow Feb 27, 2026
5f7889f
prevent calls if not api key to /usage/plan
rafel-roboflow Mar 2, 2026
bfd3f93
Revert "prevent calls if not api key to /usage/plan"
rafel-roboflow Mar 2, 2026
5a00f91
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
grzegorz-roboflow Mar 4, 2026
1fc3b1e
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 4, 2026
68c9342
add missing @with_route_exceptions_async decorator to WebRTC session …
rafel-roboflow Mar 4, 2026
6f66d68
Add proper error handling and request validation to WebRTC session en…
rafel-roboflow Mar 4, 2026
dfe6c22
clean up unused var
rafel-roboflow Mar 4, 2026
54731ae
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Mar 4, 2026
12b50de
added redis total sessions count
rafel-roboflow Mar 4, 2026
037206d
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Mar 4, 2026
8f03e1d
fix: return proper HTTP error codes for WebRTC session auth failures
rafel-roboflow Mar 4, 2026
c68fa5c
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 4, 2026
5bbca17
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Mar 4, 2026
fe59347
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Mar 9, 2026
61d6582
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 9, 2026
f2811b8
debug adding heartbeat debug
rafel-roboflow Mar 9, 2026
c3821bd
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 9, 2026
a418923
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Mar 9, 2026
d092189
revert debug log
rafel-roboflow Mar 9, 2026
08e4d70
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 17, 2026
074c78d
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Mar 17, 2026
940c004
raise http 500 error if no workspace id found
rafel-roboflow Mar 19, 2026
fd4b7e9
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 19, 2026
4f0476c
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions inference/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,31 @@
WEBRTC_MODAL_USAGE_QUOTA_ENABLED = str2bool(
os.getenv("WEBRTC_MODAL_USAGE_QUOTA_ENABLED", "False")
)

#
# Workspace stream quota
#
# Redis-base rate limiting that disables more than N concurrent
# connections from a single workspace
WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED = str2bool(
os.getenv("WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED", "False")
)
WEBRTC_WORKSPACE_STREAM_QUOTA = int(os.getenv("WEBRTC_WORKSPACE_STREAM_QUOTA", "10"))
# TTL in seconds for active stream entries (auto-expire if no explicit cleanup)
WEBRTC_WORKSPACE_STREAM_TTL_SECONDS = int(
os.getenv("WEBRTC_WORKSPACE_STREAM_TTL_SECONDS", "60")
)
# URL for Modal to send session heartbeats to keep session alive
# Example: "https://serverless.roboflow.com/webrtc/session/heartbeat"
WEBRTC_SESSION_HEARTBEAT_URL = os.getenv(
"WEBRTC_SESSION_HEARTBEAT_URL",
None,
)
# How often Modal sends session heartbeats (in seconds)
WEBRTC_SESSION_HEARTBEAT_INTERVAL_SECONDS = int(
os.getenv("WEBRTC_SESSION_HEARTBEAT_INTERVAL_SECONDS", "30")
)

WEBRTC_DATA_CHANNEL_BUFFER_DRAINING_DELAY = float(
os.getenv("WEBRTC_DATA_CHANNEL_BUFFER_DRAINING_DELAY", "0.1")
)
Expand Down
11 changes: 11 additions & 0 deletions inference/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,14 @@ class WebRTCConfigurationError(Exception):

class CreditsExceededError(Exception):
pass


class WorkspaceStreamQuotaError(Exception):
"""Raised when the workspace stream quota has been exceeded.

This error is returned when a workspace has reached its maximum number
of concurrent WebRTC streams. This is to prevent that a single user
uses all our modal resources.
"""

pass
19 changes: 19 additions & 0 deletions inference/core/interfaces/http/error_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ContentTypeMissing,
CreditsExceededError,
InferenceModelNotFound,
WorkspaceStreamQuotaError,
InputImageLoadError,
InvalidEnvironmentVariableError,
InvalidMaskDecodeArgument,
Expand Down Expand Up @@ -378,6 +379,15 @@ def wrapped_route(*args, **kwargs):
"error_type": "CreditsExceededError",
},
)
except WorkspaceStreamQuotaError as error:
logger.error("%s: %s", type(error).__name__, error)
resp = JSONResponse(
status_code=429,
content={
"message": str(error),
"error_type": "WorkspaceStreamQuotaError",
},
)
except Exception as error:
logger.exception("%s: %s", type(error).__name__, error)
resp = JSONResponse(status_code=500, content={"message": "Internal error."})
Expand Down Expand Up @@ -699,6 +709,15 @@ async def wrapped_route(*args, **kwargs):
"error_type": "CreditsExceededError",
},
)
except WorkspaceStreamQuotaError as error:
logger.error("%s: %s", type(error).__name__, error)
resp = JSONResponse(
status_code=429,
content={
"message": str(error),
"error_type": "WorkspaceStreamQuotaError",
},
)
except Exception as error:
logger.exception("%s: %s", type(error).__name__, error)
resp = JSONResponse(status_code=500, content={"message": "Internal error."})
Expand Down
31 changes: 31 additions & 0 deletions inference/core/interfaces/http/http_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@
WebRTCWorkerRequest,
WebRTCWorkerResult,
)
from inference.core.interfaces.webrtc_worker.utils import refresh_webrtc_session
from inference.core.managers.base import ModelManager
from inference.core.managers.metrics import get_container_stats
from inference.core.managers.prometheus import InferenceInstrumentator
Expand Down Expand Up @@ -1599,6 +1600,36 @@ async def initialise_webrtc_worker(
type=worker_result.answer.type,
)

@app.post(
"/webrtc/session/heartbeat",
summary="WebRTC session heartbeat",
description="Called by Modal workers to keep their session alive in the quota tracking system",
)
async def webrtc_session_heartbeat(
request: Request,
) -> dict:
"""Receive heartbeat for an active WebRTC session.

This endpoint is called periodically by Modal workers to indicate
that their session is still active. The session will be removed from
the quota count if no heartbeat is received within the TTL period.
"""
body = await request.json()
workspace_id = body.get("workspace_id")
session_id = body.get("session_id")

if not workspace_id or not session_id:
return {
"status": "error",
"message": "workspace_id and session_id required",
}

refresh_webrtc_session(
workspace_id=workspace_id,
session_id=session_id,
)
return {"status": "ok"}

if ENABLE_STREAM_API:

@app.get(
Expand Down
46 changes: 44 additions & 2 deletions inference/core/interfaces/webrtc_worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import asyncio
import multiprocessing
import uuid

from inference.core.env import (
WEBRTC_MODAL_TOKEN_ID,
WEBRTC_MODAL_TOKEN_SECRET,
WEBRTC_MODAL_USAGE_QUOTA_ENABLED,
WEBRTC_WORKSPACE_STREAM_QUOTA,
WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED,
WEBRTC_WORKSPACE_STREAM_TTL_SECONDS,
)
from inference.core.exceptions import CreditsExceededError
from inference.core.exceptions import CreditsExceededError, WorkspaceStreamQuotaError
from inference.core.interfaces.webrtc_worker.cpu import rtc_peer_connection_process
from inference.core.interfaces.webrtc_worker.entities import (
RTCIceServer,
Expand All @@ -15,6 +19,7 @@
WebRTCWorkerResult,
)
from inference.core.logger import logger
from inference.core.roboflow_api import get_roboflow_workspace


async def start_worker(
Expand All @@ -36,7 +41,11 @@
from inference.core.interfaces.webrtc_worker.modal import (
spawn_rtc_peer_connection_modal,
)
from inference.core.interfaces.webrtc_worker.utils import is_over_quota
from inference.core.interfaces.webrtc_worker.utils import (
is_over_quota,
is_over_workspace_session_quota,
register_webrtc_session,
)
except ImportError:
raise ImportError(
"Modal not installed, please install it using 'pip install modal'"
Expand All @@ -46,6 +55,39 @@
logger.error("API key over quota")
raise CreditsExceededError("API key over quota")

workspace_id = None
session_id = str(uuid.uuid4())
if WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED:
workspace_id = get_roboflow_workspace(api_key=webrtc_request.api_key)
if workspace_id and is_over_workspace_session_quota(
workspace_id=workspace_id,
quota=WEBRTC_WORKSPACE_STREAM_QUOTA,
ttl_seconds=WEBRTC_WORKSPACE_STREAM_TTL_SECONDS,
):
logger.warning(
"Workspace %s has exceeded the stream quota of %d",
workspace_id,
WEBRTC_WORKSPACE_STREAM_QUOTA,
)
raise WorkspaceStreamQuotaError(
f"You have reached the maximum of {WEBRTC_WORKSPACE_STREAM_QUOTA} "
f"concurrent streams. Please contact Roboflow to increase your quota."
)

if workspace_id:
register_webrtc_session(
workspace_id=workspace_id,
session_id=session_id,
)
# we need to pass to modal how to identifier workspace/ session id.
webrtc_request.workspace_id = workspace_id
webrtc_request.session_id = session_id
logger.info(
"Started WebRTC session %s for workspace %s",
session_id,
workspace_id,
)

loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
Expand Down
3 changes: 3 additions & 0 deletions inference/core/interfaces/webrtc_worker/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class WebRTCWorkerRequest(BaseModel):
# must be valid region: https://modal.com/docs/guide/region-selection#region-options
requested_region: Optional[str] = None

workspace_id: Optional[str] = None
session_id: Optional[str] = None


class WebRTCVideoMetadata(BaseModel):
frame_id: int
Expand Down
4 changes: 4 additions & 0 deletions inference/core/interfaces/webrtc_worker/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
WEBRTC_MODAL_TOKEN_SECRET,
WEBRTC_MODAL_USAGE_QUOTA_ENABLED,
WEBRTC_MODAL_WATCHDOG_TIMEMOUT,
WEBRTC_SESSION_HEARTBEAT_URL,
WORKFLOWS_CUSTOM_PYTHON_EXECUTION_MODE,
)
from inference.core.exceptions import (
Expand Down Expand Up @@ -388,6 +389,9 @@ def send_answer(obj: WebRTCWorkerResult):
watchdog = Watchdog(
api_key=webrtc_request.api_key,
timeout_seconds=WEBRTC_MODAL_WATCHDOG_TIMEMOUT,
workspace_id=webrtc_request.workspace_id,
session_id=webrtc_request.session_id,
heartbeat_url=WEBRTC_SESSION_HEARTBEAT_URL,
)

try:
Expand Down
110 changes: 110 additions & 0 deletions inference/core/interfaces/webrtc_worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from av import VideoFrame

from inference.core import logger
from inference.core.cache import cache
from inference.core.cache.redis import RedisCache
from inference.core.env import DEBUG_WEBRTC_PROCESSING_LATENCY
from inference.core.interfaces.camera.entities import VideoFrame as InferenceVideoFrame
from inference.core.interfaces.stream.inference_pipeline import InferencePipeline
Expand Down Expand Up @@ -240,6 +242,114 @@
return is_over_quota


def _get_concurrent_sessions_key(workspace_id: str) -> str:
"""Get the Redis key for tracking concurrent sessions for a workspace."""
return f"webrtc:concurrent_sessions:{workspace_id}"


def register_webrtc_session(workspace_id: str, session_id: str) -> None:
"""Register a new concurrent WebRTC session for a workspace.

Adds the session to a Redis sorted set with current timestamp as score.
Expired entries are cleaned up on read via ZREMRANGEBYSCORE (O(log N + M)).

Args:
workspace_id: The workspace identifier
session_id: Unique identifier for this session
"""
if not isinstance(cache, RedisCache):
logger.warning(
"Redis not available, skipping session registration for rate limiting"
)
return

key = _get_concurrent_sessions_key(workspace_id)
try:
cache.client.zadd(key, {session_id: time.time()})
logger.info(
"Registered session: workspace=%s, session=%s",
workspace_id,
session_id,
)
except Exception as e:
logger.error("Failed to register session: %s", e)


def refresh_webrtc_session(workspace_id: str, session_id: str) -> None:
"""Refresh the timestamp for a concurrent WebRTC session.

Should be called periodically to keep the session marked as active.
If not refreshed, the session will be considered expired after TTL.

Args:
workspace_id: The workspace identifier
session_id: The session identifier to refresh
"""
if not isinstance(cache, RedisCache):
return

key = _get_concurrent_sessions_key(workspace_id)
try:
cache.client.zadd(key, {session_id: time.time()})
except Exception as e:
logger.error("Failed to refresh session: %s", e)


def get_concurrent_session_count(workspace_id: str, ttl_seconds: int) -> int:
"""Get the count of concurrent sessions for a workspace.

Cleans up expired entries (older than TTL) before counting.

Args:
workspace_id: The workspace identifier
ttl_seconds: TTL in seconds - entries older than this are considered expired

Returns:
Number of concurrent sessions for the workspace
"""
if not isinstance(cache, RedisCache):
logger.warning(
"Redis not available, cannot count concurrent sessions - allowing request"
)
return 0

key = _get_concurrent_sessions_key(workspace_id)
cutoff = time.time() - ttl_seconds

try:
# Step 1: we remove expired entries
# Step 2: we return what is still valid
cache.client.zremrangebyscore(key, "-inf", cutoff)
count = cache.client.zcard(key)
return count
except Exception as e:
logger.error("Failed to get concurrent session count: %s", e)
return 0


def is_over_workspace_session_quota(
workspace_id: str, quota: int, ttl_seconds: int
) -> bool:
"""Check if a workspace has exceeded its concurrent session quota.

Args:
workspace_id: The workspace identifier
quota: Maximum number of concurrent sessions allowed
ttl_seconds: TTL for considering sessions as active

Returns:
True if the workspace has reached or exceeded the quota
"""
count = get_concurrent_session_count(workspace_id, ttl_seconds)
logger.info(
"Workspace %s has %d concurrent sessions (quota: %d)",
workspace_id,
count,
quota,
)
return count >= quota


def get_video_fps(filepath: str) -> Optional[float]:
"""Detect video FPS from container metadata.

Expand Down
Loading
Loading