Skip to content
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
697b6e6
feat: Add Redis-based workspace stream quota for WebRTC sessions
rafel-roboflow Feb 20, 2026
269d2c4
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Feb 20, 2026
b9cefda
added debug
rafel-roboflow Feb 20, 2026
e1af7f9
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Feb 20, 2026
b83f828
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Feb 20, 2026
fd0c661
more debug; pass envs to modal env
rafel-roboflow Feb 20, 2026
b7c9a62
feat: add detailed Redis logging for session tracking debugging"
rafel-roboflow Feb 23, 2026
3de18e7
Fix issue with RF-Detr model post-processing in TRT
PawelPeczek-Roboflow Feb 23, 2026
4362ca0
Merge remote-tracking branch 'origin/fix/patch-post-processing-in-rfd…
rafel-roboflow Feb 23, 2026
ee50855
clean up
rafel-roboflow Feb 23, 2026
37650bd
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Feb 23, 2026
e19d8ac
clean up
rafel-roboflow Feb 23, 2026
400f71e
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Feb 23, 2026
002e2a3
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Feb 23, 2026
aba5acd
set max conn webrtc to 10
rafel-roboflow Feb 24, 2026
71b3048
isorted
rafel-roboflow Feb 24, 2026
459bbd5
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Feb 27, 2026
35b5c62
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Feb 27, 2026
c7c9e91
fix bug heartbeat session without auth; fix bug allowing refresh non …
rafel-roboflow Feb 27, 2026
47fa8f2
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Feb 27, 2026
8a173d7
removed unnecessary workspace id from refresh webrtc api
rafel-roboflow Feb 27, 2026
91a1e06
calling hearbeat in watchdog just at the beginning insterad of waitin…
rafel-roboflow Feb 27, 2026
8d790fb
implemented deregister session to avoid keeping sessions forever
rafel-roboflow Feb 27, 2026
cd8ebc3
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Feb 27, 2026
49f7f15
fix wrong endpoint refactor
rafel-roboflow Feb 27, 2026
b23bd72
include always workspace id
rafel-roboflow Feb 27, 2026
3579f5d
set ttl on redis zadd to avoid redis growing without control
rafel-roboflow Feb 27, 2026
5f7889f
prevent calls if not api key to /usage/plan
rafel-roboflow Mar 2, 2026
bfd3f93
Revert "prevent calls if not api key to /usage/plan"
rafel-roboflow Mar 2, 2026
5a00f91
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
grzegorz-roboflow Mar 4, 2026
1fc3b1e
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 4, 2026
68c9342
add missing @with_route_exceptions_async decorator to WebRTC session …
rafel-roboflow Mar 4, 2026
6f66d68
Add proper error handling and request validation to WebRTC session en…
rafel-roboflow Mar 4, 2026
dfe6c22
clean up unused var
rafel-roboflow Mar 4, 2026
54731ae
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Mar 4, 2026
12b50de
added redis total sessions count
rafel-roboflow Mar 4, 2026
037206d
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Mar 4, 2026
8f03e1d
fix: return proper HTTP error codes for WebRTC session auth failures
rafel-roboflow Mar 4, 2026
c68fa5c
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 4, 2026
5bbca17
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Mar 4, 2026
fe59347
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Mar 9, 2026
61d6582
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 9, 2026
f2811b8
debug adding heartbeat debug
rafel-roboflow Mar 9, 2026
c3821bd
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 9, 2026
a418923
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Mar 9, 2026
d092189
revert debug log
rafel-roboflow Mar 9, 2026
08e4d70
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 17, 2026
074c78d
Merge branch 'main' into feat/dg-232-set-rate-limit-to-10-concurrent-…
rafel-roboflow Mar 17, 2026
940c004
raise http 500 error if no workspace id found
rafel-roboflow Mar 19, 2026
fd4b7e9
Merge branch 'main' of github.com:roboflow/inference into feat/dg-232…
rafel-roboflow Mar 19, 2026
4f0476c
Merge branch 'feat/dg-232-set-rate-limit-to-10-concurrent-streams-and…
rafel-roboflow Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions inference/core/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,31 @@
WEBRTC_MODAL_USAGE_QUOTA_ENABLED = str2bool(
os.getenv("WEBRTC_MODAL_USAGE_QUOTA_ENABLED", "False")
)

#
# Workspace stream quota
#
# Redis-base rate limiting that disables more than N concurrent
# connections from a single workspace
WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED = str2bool(
os.getenv("WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED", "False")
)
WEBRTC_WORKSPACE_STREAM_QUOTA = int(os.getenv("WEBRTC_WORKSPACE_STREAM_QUOTA", "10"))
# TTL in seconds for active stream entries (auto-expire if no explicit cleanup)
WEBRTC_WORKSPACE_STREAM_TTL_SECONDS = int(
os.getenv("WEBRTC_WORKSPACE_STREAM_TTL_SECONDS", "60")
)
# URL for Modal to send session heartbeats to keep session alive
# Example: "https://serverless.roboflow.com/webrtc/session/heartbeat"
WEBRTC_SESSION_HEARTBEAT_URL = os.getenv(
"WEBRTC_SESSION_HEARTBEAT_URL",
None,
)
# How often Modal sends session heartbeats (in seconds)
WEBRTC_SESSION_HEARTBEAT_INTERVAL_SECONDS = int(
os.getenv("WEBRTC_SESSION_HEARTBEAT_INTERVAL_SECONDS", "30")
)

WEBRTC_DATA_CHANNEL_BUFFER_DRAINING_DELAY = float(
os.getenv("WEBRTC_DATA_CHANNEL_BUFFER_DRAINING_DELAY", "0.1")
)
Expand Down
11 changes: 11 additions & 0 deletions inference/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,14 @@ class WebRTCConfigurationError(Exception):

class CreditsExceededError(Exception):
pass


class WorkspaceStreamQuotaError(Exception):
"""Raised when the workspace stream quota has been exceeded.

This error is returned when a workspace has reached its maximum number
of concurrent WebRTC streams. This is to prevent that a single user
uses all our modal resources.
"""

pass
19 changes: 19 additions & 0 deletions inference/core/interfaces/http/error_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
ServiceConfigurationError,
WebRTCConfigurationError,
WorkspaceLoadError,
WorkspaceStreamQuotaError,
)
from inference.core.interfaces.stream_manager.api.errors import (
ProcessesManagerAuthorisationError,
Expand Down Expand Up @@ -444,6 +445,15 @@ def wrapped_route(*args, **kwargs):
"error_type": "CreditsExceededError",
},
)
except WorkspaceStreamQuotaError as error:
logger.error("%s: %s", type(error).__name__, error)
resp = JSONResponse(
status_code=429,
content={
"message": str(error),
"error_type": "WorkspaceStreamQuotaError",
},
)
except Exception as error:
logger.exception("%s: %s", type(error).__name__, error)
resp = JSONResponse(status_code=500, content={"message": "Internal error."})
Expand Down Expand Up @@ -816,6 +826,15 @@ async def wrapped_route(*args, **kwargs):
"error_type": "CreditsExceededError",
},
)
except WorkspaceStreamQuotaError as error:
logger.error("%s: %s", type(error).__name__, error)
resp = JSONResponse(
status_code=429,
content={
"message": str(error),
"error_type": "WorkspaceStreamQuotaError",
},
)
except Exception as error:
logger.exception("%s: %s", type(error).__name__, error)
resp = JSONResponse(status_code=500, content={"message": "Internal error."})
Expand Down
70 changes: 70 additions & 0 deletions inference/core/interfaces/http/http_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,14 @@
)
from inference.core.interfaces.webrtc_worker import start_worker
from inference.core.interfaces.webrtc_worker.entities import (
WebRTCSessionHeartbeatRequest,
WebRTCWorkerRequest,
WebRTCWorkerResult,
)
from inference.core.interfaces.webrtc_worker.utils import (
deregister_webrtc_session,
refresh_webrtc_session,
)
from inference.core.managers.base import ModelManager
from inference.core.managers.metrics import get_container_stats
from inference.core.managers.prometheus import InferenceInstrumentator
Expand Down Expand Up @@ -1673,6 +1678,71 @@ async def initialise_webrtc_worker(
type=worker_result.answer.type,
)

@app.post(
"/webrtc/session/heartbeat",
summary="WebRTC session heartbeat",
)
@with_route_exceptions_async
async def webrtc_session_heartbeat(
request: WebRTCSessionHeartbeatRequest,
) -> dict:
"""Receive heartbeat for an active WebRTC session.

This endpoint is called periodically to indicate
that their session is still active. The session will be removed from
the quota count if no heartbeat is received within the TTL period.

Requires api_key for authentication.
"""
try:
workspace_id = await get_roboflow_workspace_async(
api_key=request.api_key
)
except (RoboflowAPINotAuthorizedError, WorkspaceLoadError):
return {
"status": "error",
"message": "unauthorized",
}

session_refreshed = refresh_webrtc_session(
workspace_id=workspace_id,
session_id=request.session_id,
)
if not session_refreshed:
return {
"status": "error",
"message": "session not found",
}
return {"status": "ok"}

@app.post(
"/webrtc/session/heartbeat/end",
summary="End WebRTC session",
)
@with_route_exceptions_async
async def webrtc_session_end(
request: WebRTCSessionHeartbeatRequest,
) -> dict:
"""End a WebRTC session and immediately free the quota slot.

Requires api_key for authentication.
"""
try:
workspace_id = await get_roboflow_workspace_async(
api_key=request.api_key
)
except (RoboflowAPINotAuthorizedError, WorkspaceLoadError):
return {
"status": "error",
"message": "unauthorized",
}

deregister_webrtc_session(
workspace_id=workspace_id,
session_id=request.session_id,
)
return {"status": "ok"}

if ENABLE_STREAM_API:

@app.get(
Expand Down
55 changes: 53 additions & 2 deletions inference/core/interfaces/webrtc_worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import asyncio
import multiprocessing
import uuid

from inference.core.env import (
WEBRTC_MODAL_TOKEN_ID,
WEBRTC_MODAL_TOKEN_SECRET,
WEBRTC_MODAL_USAGE_QUOTA_ENABLED,
WEBRTC_WORKSPACE_STREAM_QUOTA,
WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED,
WEBRTC_WORKSPACE_STREAM_TTL_SECONDS,
)
from inference.core.exceptions import CreditsExceededError
from inference.core.exceptions import CreditsExceededError, WorkspaceStreamQuotaError
from inference.core.interfaces.webrtc_worker.cpu import rtc_peer_connection_process
from inference.core.interfaces.webrtc_worker.entities import (
RTCIceServer,
Expand All @@ -15,6 +19,7 @@
WebRTCWorkerResult,
)
from inference.core.logger import logger
from inference.core.roboflow_api import get_roboflow_workspace


async def start_worker(
Expand All @@ -36,7 +41,12 @@ async def start_worker(
from inference.core.interfaces.webrtc_worker.modal import (
spawn_rtc_peer_connection_modal,
)
from inference.core.interfaces.webrtc_worker.utils import is_over_quota
from inference.core.interfaces.webrtc_worker.utils import (
get_total_concurrent_sessions,
is_over_quota,
is_over_workspace_session_quota,
register_webrtc_session,
)
except ImportError:
raise ImportError(
"Modal not installed, please install it using 'pip install modal'"
Expand All @@ -46,6 +56,47 @@ async def start_worker(
logger.error("API key over quota")
raise CreditsExceededError("API key over quota")

session_id = str(uuid.uuid4())
workspace_id = get_roboflow_workspace(api_key=webrtc_request.api_key)
webrtc_request.workspace_id = workspace_id
webrtc_request.session_id = session_id

if WEBRTC_WORKSPACE_STREAM_QUOTA_ENABLED:
if workspace_id and is_over_workspace_session_quota(
workspace_id=workspace_id,
quota=WEBRTC_WORKSPACE_STREAM_QUOTA,
ttl_seconds=WEBRTC_WORKSPACE_STREAM_TTL_SECONDS,
):
logger.warning(
"Workspace %s has exceeded the stream quota of %d",
workspace_id,
WEBRTC_WORKSPACE_STREAM_QUOTA,
)
raise WorkspaceStreamQuotaError(
f"You have reached the maximum of {WEBRTC_WORKSPACE_STREAM_QUOTA} "
f"concurrent streams."
)

if workspace_id:
register_webrtc_session(
workspace_id=workspace_id,
session_id=session_id,
)

total_sessions = get_total_concurrent_sessions(
ttl_seconds=WEBRTC_WORKSPACE_STREAM_TTL_SECONDS
)
logger.info(
"Total concurrent WebRTC sessions: %d",
total_sessions,
)

logger.info(
"Started WebRTC session %s for workspace %s",
session_id,
workspace_id,
)

loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
Expand Down
10 changes: 10 additions & 0 deletions inference/core/interfaces/webrtc_worker/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class WebRTCWorkerRequest(BaseModel):
# must be valid region: https://modal.com/docs/guide/region-selection#region-options
requested_region: Optional[str] = None

workspace_id: Optional[str] = None
session_id: Optional[str] = None


class WebRTCVideoMetadata(BaseModel):
frame_id: int
Expand Down Expand Up @@ -86,6 +89,13 @@ class WebRTCWorkerResult(BaseModel):
inner_error: Optional[str] = None


class WebRTCSessionHeartbeatRequest(BaseModel):
"""Request body for WebRTC session heartbeat and end endpoints."""

session_id: str
api_key: str


class StreamOutputMode(str, Enum):
AUTO_DETECT = "auto_detect" # None -> auto-detect first image
NO_VIDEO = "no_video" # [] -> no video track
Expand Down
11 changes: 11 additions & 0 deletions inference/core/interfaces/webrtc_worker/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
WEBRTC_MODAL_TOKEN_SECRET,
WEBRTC_MODAL_USAGE_QUOTA_ENABLED,
WEBRTC_MODAL_WATCHDOG_TIMEMOUT,
WEBRTC_SESSION_HEARTBEAT_INTERVAL_SECONDS,
WEBRTC_SESSION_HEARTBEAT_URL,
WORKFLOWS_CUSTOM_PYTHON_EXECUTION_MODE,
)
from inference.core.exceptions import (
Expand Down Expand Up @@ -210,6 +212,12 @@ def check_nvidia_smi_gpu() -> str:
"WEBRTC_GZIP_PREVIEW_FRAME_COMPRESSION": str(
WEBRTC_GZIP_PREVIEW_FRAME_COMPRESSION
),
"WEBRTC_SESSION_HEARTBEAT_URL": (
WEBRTC_SESSION_HEARTBEAT_URL if WEBRTC_SESSION_HEARTBEAT_URL else ""
),
"WEBRTC_SESSION_HEARTBEAT_INTERVAL_SECONDS": str(
WEBRTC_SESSION_HEARTBEAT_INTERVAL_SECONDS
),
},
"volumes": {MODEL_CACHE_DIR: rfcache_volume},
}
Expand Down Expand Up @@ -388,6 +396,9 @@ def send_answer(obj: WebRTCWorkerResult):
watchdog = Watchdog(
api_key=webrtc_request.api_key,
timeout_seconds=WEBRTC_MODAL_WATCHDOG_TIMEMOUT,
workspace_id=getattr(webrtc_request, "workspace_id", None),
session_id=getattr(webrtc_request, "session_id", None),
heartbeat_url=WEBRTC_SESSION_HEARTBEAT_URL,
)

try:
Expand Down
Loading