From 5c2050f11f7c03936dff67fb35df2aaa12c345a1 Mon Sep 17 00:00:00 2001 From: Alex Norell Date: Fri, 13 Mar 2026 15:53:33 -0700 Subject: [PATCH] Add /stats and /latest_frame endpoints for device-manager Adds two new HTTP endpoints behind the ENABLE_STREAM_API flag: - GET /stats: returns aggregated camera_fps, inference_fps, and stream_count across all active pipelines. Reuses existing list_pipelines/get_status IPC -- no new commands needed. - GET /inference_pipelines/{pipeline_id}/latest_frame: returns the most recent frame as a base64-encoded JPEG with metadata. Adds a new LATEST_FRAME IPC command that peeks at the buffer non-destructively. --- inference/core/interfaces/http/http_api.py | 59 +++ .../interfaces/stream_manager/api/entities.py | 9 + .../api/stream_manager_client.py | 21 + .../stream_manager/manager_app/entities.py | 1 + .../manager_app/inference_pipeline_manager.py | 53 +++ .../test_stats_and_latest_frame.py | 398 ++++++++++++++++++ 6 files changed, 541 insertions(+) create mode 100644 tests/inference/unit_tests/core/interfaces/stream_manager/test_stats_and_latest_frame.py diff --git a/inference/core/interfaces/http/http_api.py b/inference/core/interfaces/http/http_api.py index 0171cf7d03..a6b993fd73 100644 --- a/inference/core/interfaces/http/http_api.py +++ b/inference/core/interfaces/http/http_api.py @@ -231,6 +231,7 @@ InferencePipelineStatusResponse, InitializeWebRTCPipelineResponse, InitializeWebRTCResponse, + LatestFrameResponse, ListPipelinesResponse, ) from inference.core.interfaces.stream_manager.api.stream_manager_client import ( @@ -1893,6 +1894,64 @@ async def consume( excluded_fields=request.excluded_fields, ) + @app.get( + "/stats", + summary="Aggregated pipeline statistics", + ) + @with_route_exceptions_async + async def get_stats(): + stream_count = 0 + camera_fps_values = [] + inference_fps_values = [] + if self.stream_manager_client is not None: + try: + pipelines_resp = ( + await self.stream_manager_client.list_pipelines() + ) + pipeline_ids = pipelines_resp.pipelines + stream_count = len(pipeline_ids) + for pid in pipeline_ids: + status_resp = await self.stream_manager_client.get_status( + pid + ) + report = status_resp.report + throughput = report.get("inference_throughput", 0.0) + if throughput and throughput > 0: + inference_fps_values.append(throughput) + for src in report.get("sources_metadata", []): + props = src.get("source_properties") or {} + fps = props.get("fps") + if fps and fps > 0: + camera_fps_values.append(fps) + except Exception: + pass + return { + "camera_fps": ( + sum(camera_fps_values) / len(camera_fps_values) + if camera_fps_values + else None + ), + "inference_fps": ( + sum(inference_fps_values) / len(inference_fps_values) + if inference_fps_values + else None + ), + "stream_count": stream_count, + } + + @app.get( + "/inference_pipelines/{pipeline_id}/latest_frame", + response_model=LatestFrameResponse, + summary="[EXPERIMENTAL] Get latest frame from InferencePipeline", + ) + @with_route_exceptions_async + async def latest_frame( + pipeline_id: str, + ) -> LatestFrameResponse: + return await self.stream_manager_client.get_latest_frame( + pipeline_id=pipeline_id + ) + class ModelInitState: """Class to track model initialization state.""" diff --git a/inference/core/interfaces/stream_manager/api/entities.py b/inference/core/interfaces/stream_manager/api/entities.py index a0413b411e..0fefd560a3 100644 --- a/inference/core/interfaces/stream_manager/api/entities.py +++ b/inference/core/interfaces/stream_manager/api/entities.py @@ -37,6 +37,15 @@ class ConsumePipelineResponse(CommandResponse): frames_metadata: List[FrameMetadata] +class LatestFrameResponse(CommandResponse): + frame_data: Optional[str] = Field( + default=None, description="Base64-encoded JPEG image" + ) + frame_id: Optional[int] = Field(default=None) + frame_timestamp: Optional[datetime] = Field(default=None) + source_id: Optional[int] = Field(default=None) + + class InitializeWebRTCPipelineResponse(CommandResponse): sdp: str type: str diff --git a/inference/core/interfaces/stream_manager/api/stream_manager_client.py b/inference/core/interfaces/stream_manager/api/stream_manager_client.py index 8edd226b09..c23ca8f355 100644 --- a/inference/core/interfaces/stream_manager/api/stream_manager_client.py +++ b/inference/core/interfaces/stream_manager/api/stream_manager_client.py @@ -13,6 +13,7 @@ FrameMetadata, InferencePipelineStatusResponse, InitializeWebRTCPipelineResponse, + LatestFrameResponse, ListPipelinesResponse, ) from inference.core.interfaces.stream_manager.api.errors import ( @@ -200,6 +201,26 @@ async def consume_pipeline_result( ], ) + async def get_latest_frame(self, pipeline_id: str) -> LatestFrameResponse: + command = { + TYPE_KEY: CommandType.LATEST_FRAME, + PIPELINE_ID_KEY: pipeline_id, + } + response = await self._handle_command(command=command) + status = response[RESPONSE_KEY][STATUS_KEY] + context = CommandContext( + request_id=response.get(REQUEST_ID_KEY), + pipeline_id=response.get(PIPELINE_ID_KEY), + ) + return LatestFrameResponse( + status=status, + context=context, + frame_data=response[RESPONSE_KEY].get("frame_data"), + frame_id=response[RESPONSE_KEY].get("frame_id"), + frame_timestamp=response[RESPONSE_KEY].get("frame_timestamp"), + source_id=response[RESPONSE_KEY].get("source_id"), + ) + async def _handle_command(self, command: dict) -> dict: response = await send_command( host=self._host, diff --git a/inference/core/interfaces/stream_manager/manager_app/entities.py b/inference/core/interfaces/stream_manager/manager_app/entities.py index ddeb9453d0..4e808b3fa9 100644 --- a/inference/core/interfaces/stream_manager/manager_app/entities.py +++ b/inference/core/interfaces/stream_manager/manager_app/entities.py @@ -49,6 +49,7 @@ class CommandType(str, Enum): TERMINATE = "terminate" LIST_PIPELINES = "list_pipelines" CONSUME_RESULT = "consume_result" + LATEST_FRAME = "latest_frame" class VideoConfiguration(BaseModel): diff --git a/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py b/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py index 9fd7be91a9..d31ea7ce6a 100644 --- a/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py +++ b/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py @@ -1,4 +1,5 @@ import asyncio +import base64 import json import os import signal @@ -148,6 +149,8 @@ def _handle_command(self, request_id: str, payload: dict) -> None: return self._get_pipeline_status(request_id=request_id) if command_type is CommandType.CONSUME_RESULT: return self._consume_results(request_id=request_id, payload=payload) + if command_type is CommandType.LATEST_FRAME: + return self._handle_latest_frame(request_id=request_id) raise NotImplementedError( f"Command type `{command_type}` cannot be handled" ) @@ -636,6 +639,56 @@ def _consume_results(self, request_id: str, payload: dict) -> None: error_type=ErrorType.OPERATION_ERROR, ) + def _handle_latest_frame(self, request_id: str) -> None: + try: + if self._buffer_sink is None or self._buffer_sink.empty(): + response_payload = { + STATUS_KEY: OperationStatus.SUCCESS, + "frame_data": None, + "frame_id": None, + "frame_timestamp": None, + "source_id": None, + } + self._responses_queue.put((request_id, response_payload)) + return None + # Peek at the last item in the buffer (non-destructive) + predictions, frames = self._buffer_sink._buffer[-1] + # Find the last non-None VideoFrame + frame = None + for f in reversed(frames): + if f is not None: + frame = f + break + if frame is None: + response_payload = { + STATUS_KEY: OperationStatus.SUCCESS, + "frame_data": None, + "frame_id": None, + "frame_timestamp": None, + "source_id": None, + } + self._responses_queue.put((request_id, response_payload)) + return None + _, jpeg_bytes = cv.imencode( + ".jpg", frame.image, [cv.IMWRITE_JPEG_QUALITY, 70] + ) + frame_b64 = base64.b64encode(jpeg_bytes.tobytes()).decode("ascii") + response_payload = { + STATUS_KEY: OperationStatus.SUCCESS, + "frame_data": frame_b64, + "frame_id": frame.frame_id, + "frame_timestamp": frame.frame_timestamp.isoformat(), + "source_id": frame.source_id, + } + self._responses_queue.put((request_id, response_payload)) + except Exception as error: + self._handle_error( + request_id=request_id, + error=error, + public_error_message="Unexpected error retrieving latest frame.", + error_type=ErrorType.OPERATION_ERROR, + ) + def _handle_error( self, request_id: str, diff --git a/tests/inference/unit_tests/core/interfaces/stream_manager/test_stats_and_latest_frame.py b/tests/inference/unit_tests/core/interfaces/stream_manager/test_stats_and_latest_frame.py new file mode 100644 index 0000000000..df225de87d --- /dev/null +++ b/tests/inference/unit_tests/core/interfaces/stream_manager/test_stats_and_latest_frame.py @@ -0,0 +1,398 @@ +""" +Tests for /stats and /latest_frame endpoints, and the pipeline manager +handler for LATEST_FRAME command. +""" + +import base64 +from collections import deque +from datetime import datetime +from multiprocessing import Queue +from unittest import mock +from unittest.mock import AsyncMock, MagicMock + +import cv2 +import numpy as np +import pytest + +from inference.core.interfaces.camera.entities import VideoFrame +from inference.core.interfaces.stream.sinks import InMemoryBufferSink +from inference.core.interfaces.stream_manager.manager_app import ( + inference_pipeline_manager, +) +from inference.core.interfaces.stream_manager.manager_app.entities import ( + CommandType, + OperationStatus, +) +from inference.core.interfaces.stream_manager.manager_app.inference_pipeline_manager import ( + InferencePipelineManager, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_status_response(report): + resp = MagicMock() + resp.report = report + return resp + + +def _make_list_response(pipeline_ids): + resp = MagicMock() + resp.pipelines = pipeline_ids + return resp + + +def _full_report(**overrides): + report = { + "inference_throughput": 10.0, + "sources_metadata": [ + { + "source_properties": { + "width": 1920, + "height": 1080, + "total_frames": 0, + "is_file": False, + "fps": 30.0, + }, + "source_reference": "rtsp://example.com/stream", + "source_id": 0, + } + ], + } + report.update(overrides) + return report + + +# --------------------------------------------------------------------------- +# /stats logic tests (testing the aggregation logic directly) +# --------------------------------------------------------------------------- + + +class TestStatsAggregation: + """Tests for the /stats endpoint aggregation logic.""" + + def test_stats_returns_nulls_when_no_pipelines(self): + """When there are no pipelines, camera_fps and inference_fps should be None.""" + camera_fps_values = [] + inference_fps_values = [] + stream_count = 0 + + result = { + "camera_fps": ( + sum(camera_fps_values) / len(camera_fps_values) + if camera_fps_values + else None + ), + "inference_fps": ( + sum(inference_fps_values) / len(inference_fps_values) + if inference_fps_values + else None + ), + "stream_count": stream_count, + } + + assert result["camera_fps"] is None + assert result["inference_fps"] is None + assert result["stream_count"] == 0 + + def test_stats_aggregates_single_pipeline(self): + """Single pipeline should return its values directly.""" + report = _full_report(inference_throughput=15.0) + + camera_fps_values = [] + inference_fps_values = [] + + throughput = report.get("inference_throughput", 0.0) + if throughput and throughput > 0: + inference_fps_values.append(throughput) + for src in report.get("sources_metadata", []): + props = src.get("source_properties") or {} + fps = props.get("fps") + if fps and fps > 0: + camera_fps_values.append(fps) + + result = { + "camera_fps": ( + sum(camera_fps_values) / len(camera_fps_values) + if camera_fps_values + else None + ), + "inference_fps": ( + sum(inference_fps_values) / len(inference_fps_values) + if inference_fps_values + else None + ), + "stream_count": 1, + } + + assert result["camera_fps"] == 30.0 + assert result["inference_fps"] == 15.0 + assert result["stream_count"] == 1 + + def test_stats_averages_multiple_pipelines(self): + """Multiple pipelines should have their FPS values averaged.""" + reports = [ + _full_report(inference_throughput=10.0), + _full_report(inference_throughput=20.0), + ] + # Override the second pipeline's camera fps + reports[1]["sources_metadata"][0]["source_properties"]["fps"] = 60.0 + + camera_fps_values = [] + inference_fps_values = [] + for report in reports: + throughput = report.get("inference_throughput", 0.0) + if throughput and throughput > 0: + inference_fps_values.append(throughput) + for src in report.get("sources_metadata", []): + props = src.get("source_properties") or {} + fps = props.get("fps") + if fps and fps > 0: + camera_fps_values.append(fps) + + result = { + "camera_fps": ( + sum(camera_fps_values) / len(camera_fps_values) + if camera_fps_values + else None + ), + "inference_fps": ( + sum(inference_fps_values) / len(inference_fps_values) + if inference_fps_values + else None + ), + "stream_count": 2, + } + + assert result["camera_fps"] == 45.0 # (30 + 60) / 2 + assert result["inference_fps"] == 15.0 # (10 + 20) / 2 + assert result["stream_count"] == 2 + + def test_stats_skips_zero_fps(self): + """Pipelines with zero throughput or FPS should be excluded from averages.""" + report = _full_report(inference_throughput=0.0) + report["sources_metadata"][0]["source_properties"]["fps"] = 0 + + camera_fps_values = [] + inference_fps_values = [] + throughput = report.get("inference_throughput", 0.0) + if throughput and throughput > 0: + inference_fps_values.append(throughput) + for src in report.get("sources_metadata", []): + props = src.get("source_properties") or {} + fps = props.get("fps") + if fps and fps > 0: + camera_fps_values.append(fps) + + result = { + "camera_fps": ( + sum(camera_fps_values) / len(camera_fps_values) + if camera_fps_values + else None + ), + "inference_fps": ( + sum(inference_fps_values) / len(inference_fps_values) + if inference_fps_values + else None + ), + "stream_count": 1, + } + + assert result["camera_fps"] is None + assert result["inference_fps"] is None + + +# --------------------------------------------------------------------------- +# LATEST_FRAME pipeline manager handler tests +# --------------------------------------------------------------------------- + + +class TestLatestFrameHandler: + """Tests for InferencePipelineManager._handle_latest_frame.""" + + @pytest.mark.timeout(30) + def test_latest_frame_returns_null_when_buffer_is_empty(self): + """When buffer is empty, frame_data should be None.""" + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + manager._buffer_sink = InMemoryBufferSink(queue_size=10) + + command_queue.put(("1", {"type": CommandType.LATEST_FRAME})) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + assert status_1[0] == "1" + assert status_1[1]["status"] == OperationStatus.SUCCESS + assert status_1[1]["frame_data"] is None + + @pytest.mark.timeout(30) + def test_latest_frame_returns_null_when_buffer_sink_is_none(self): + """When buffer sink is None, frame_data should be None.""" + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + # buffer_sink is None by default + + command_queue.put(("1", {"type": CommandType.LATEST_FRAME})) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + assert status_1[0] == "1" + assert status_1[1]["status"] == OperationStatus.SUCCESS + assert status_1[1]["frame_data"] is None + + @pytest.mark.timeout(30) + def test_latest_frame_returns_encoded_frame(self): + """When buffer has frames, should return base64-encoded JPEG.""" + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + buffer_sink = InMemoryBufferSink(queue_size=10) + test_image = np.zeros((100, 100, 3), dtype=np.uint8) + test_image[50, 50] = [255, 0, 0] + frame_ts = datetime(2024, 1, 1, 12, 0, 0) + buffer_sink.on_prediction( + predictions=[{"test": "value"}], + video_frame=[ + VideoFrame( + image=test_image, + frame_id=42, + frame_timestamp=frame_ts, + source_id=7, + ), + ], + ) + manager._buffer_sink = buffer_sink + + command_queue.put(("1", {"type": CommandType.LATEST_FRAME})) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + assert status_1[0] == "1" + assert status_1[1]["status"] == OperationStatus.SUCCESS + assert status_1[1]["frame_data"] is not None + assert status_1[1]["frame_id"] == 42 + assert status_1[1]["source_id"] == 7 + + # Verify the base64 data decodes to a valid JPEG + decoded = base64.b64decode(status_1[1]["frame_data"]) + assert decoded[:2] == b"\xff\xd8" # JPEG magic bytes + + @pytest.mark.timeout(30) + def test_latest_frame_is_non_destructive(self): + """Reading latest frame should not consume the buffer.""" + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + buffer_sink = InMemoryBufferSink(queue_size=10) + test_image = np.zeros((50, 50, 3), dtype=np.uint8) + buffer_sink.on_prediction( + predictions=[{"test": "value"}], + video_frame=[ + VideoFrame( + image=test_image, + frame_id=1, + frame_timestamp=datetime.now(), + source_id=0, + ), + ], + ) + manager._buffer_sink = buffer_sink + + # Request latest frame twice -- both should succeed + command_queue.put(("1", {"type": CommandType.LATEST_FRAME})) + command_queue.put(("2", {"type": CommandType.LATEST_FRAME})) + command_queue.put(("3", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + status_2 = responses_queue.get() + assert status_1[1]["frame_data"] is not None + assert status_2[1]["frame_data"] is not None + # Buffer should still have its entry + assert not buffer_sink.empty() + + @pytest.mark.timeout(30) + def test_latest_frame_skips_none_frames(self): + """When the frame list contains None entries, should find the last non-None.""" + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + buffer_sink = InMemoryBufferSink(queue_size=10) + test_image = np.zeros((50, 50, 3), dtype=np.uint8) + buffer_sink.on_prediction( + predictions=[None, {"test": "value"}, None], + video_frame=[ + None, + VideoFrame( + image=test_image, + frame_id=99, + frame_timestamp=datetime.now(), + source_id=5, + ), + None, + ], + ) + manager._buffer_sink = buffer_sink + + command_queue.put(("1", {"type": CommandType.LATEST_FRAME})) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + assert status_1[1]["status"] == OperationStatus.SUCCESS + assert status_1[1]["frame_data"] is not None + assert status_1[1]["frame_id"] == 99 + assert status_1[1]["source_id"] == 5 + + @pytest.mark.timeout(30) + def test_latest_frame_returns_null_when_all_frames_are_none(self): + """When all frames in the buffer entry are None, return null frame_data.""" + command_queue, responses_queue = Queue(), Queue() + manager = InferencePipelineManager( + pipeline_id="my_pipeline", + command_queue=command_queue, + responses_queue=responses_queue, + ) + buffer_sink = InMemoryBufferSink(queue_size=10) + buffer_sink.on_prediction( + predictions=[None, None], + video_frame=[None, None], + ) + manager._buffer_sink = buffer_sink + + command_queue.put(("1", {"type": CommandType.LATEST_FRAME})) + command_queue.put(("2", {"type": CommandType.TERMINATE})) + + manager.run() + + status_1 = responses_queue.get() + assert status_1[1]["status"] == OperationStatus.SUCCESS + assert status_1[1]["frame_data"] is None