Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions inference/core/interfaces/http/http_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@
InferencePipelineStatusResponse,
InitializeWebRTCPipelineResponse,
InitializeWebRTCResponse,
LatestFrameResponse,
ListPipelinesResponse,
)
from inference.core.interfaces.stream_manager.api.stream_manager_client import (
Expand Down Expand Up @@ -1893,6 +1894,64 @@ async def consume(
excluded_fields=request.excluded_fields,
)

@app.get(
"/stats",
summary="Aggregated pipeline statistics",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description + response_model please ;)

)
@with_route_exceptions_async
async def get_stats():
stream_count = 0
camera_fps_values = []
inference_fps_values = []
if self.stream_manager_client is not None:
try:
pipelines_resp = (
await self.stream_manager_client.list_pipelines()
)
pipeline_ids = pipelines_resp.pipelines
stream_count = len(pipeline_ids)
for pid in pipeline_ids:
status_resp = await self.stream_manager_client.get_status(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasks = [
    self.stream_manager_client.get_status(pid)
    for pid in pipeline_ids
]

responses = await asyncio.gather(*tasks, return_exceptions=True)

? Not sure how much would it matter though.

pid
)
report = status_resp.report
throughput = report.get("inference_throughput", 0.0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we split this into somthing like :

async def get_stats():
  reports = await self.fetch_pipeline_reports()
  compute_stats(reports)

I think we are pushing too much into the endpoint function bodies. This is not a place for business logic.
Additionally thinking to write in this way will allow use to easily optimize the work. I know that the stats calculation is trivial, but if it would be more complicated - this is a blocking operation. Having it as a separate compute_stats would allow us to quickly fix this, running this in a separate thread or something like that.

if throughput and throughput > 0:
inference_fps_values.append(throughput)
for src in report.get("sources_metadata", []):
props = src.get("source_properties") or {}
fps = props.get("fps")
if fps and fps > 0:
camera_fps_values.append(fps)
except Exception:
pass
return {
"camera_fps": (
sum(camera_fps_values) / len(camera_fps_values)
if camera_fps_values
else None
),
"inference_fps": (
sum(inference_fps_values) / len(inference_fps_values)
if inference_fps_values
else None
),
"stream_count": stream_count,
}

@app.get(
"/inference_pipelines/{pipeline_id}/latest_frame",
response_model=LatestFrameResponse,
summary="[EXPERIMENTAL] Get latest frame from InferencePipeline",
)
@with_route_exceptions_async
async def latest_frame(
pipeline_id: str,
) -> LatestFrameResponse:
return await self.stream_manager_client.get_latest_frame(
pipeline_id=pipeline_id
)

class ModelInitState:
"""Class to track model initialization state."""

Expand Down
9 changes: 9 additions & 0 deletions inference/core/interfaces/stream_manager/api/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ class ConsumePipelineResponse(CommandResponse):
frames_metadata: List[FrameMetadata]


class LatestFrameResponse(CommandResponse):
frame_data: Optional[str] = Field(
default=None, description="Base64-encoded JPEG image"
)
frame_id: Optional[int] = Field(default=None)
frame_timestamp: Optional[datetime] = Field(default=None)
source_id: Optional[int] = Field(default=None)


class InitializeWebRTCPipelineResponse(CommandResponse):
sdp: str
type: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
FrameMetadata,
InferencePipelineStatusResponse,
InitializeWebRTCPipelineResponse,
LatestFrameResponse,
ListPipelinesResponse,
)
from inference.core.interfaces.stream_manager.api.errors import (
Expand Down Expand Up @@ -200,6 +201,26 @@ async def consume_pipeline_result(
],
)

async def get_latest_frame(self, pipeline_id: str) -> LatestFrameResponse:
command = {
TYPE_KEY: CommandType.LATEST_FRAME,
PIPELINE_ID_KEY: pipeline_id,
}
response = await self._handle_command(command=command)
status = response[RESPONSE_KEY][STATUS_KEY]
context = CommandContext(
request_id=response.get(REQUEST_ID_KEY),
pipeline_id=response.get(PIPELINE_ID_KEY),
)
return LatestFrameResponse(
status=status,
context=context,
frame_data=response[RESPONSE_KEY].get("frame_data"),
frame_id=response[RESPONSE_KEY].get("frame_id"),
frame_timestamp=response[RESPONSE_KEY].get("frame_timestamp"),
source_id=response[RESPONSE_KEY].get("source_id"),
)

async def _handle_command(self, command: dict) -> dict:
response = await send_command(
host=self._host,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class CommandType(str, Enum):
TERMINATE = "terminate"
LIST_PIPELINES = "list_pipelines"
CONSUME_RESULT = "consume_result"
LATEST_FRAME = "latest_frame"


class VideoConfiguration(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import base64
import json
import os
import signal
Expand Down Expand Up @@ -148,6 +149,8 @@ def _handle_command(self, request_id: str, payload: dict) -> None:
return self._get_pipeline_status(request_id=request_id)
if command_type is CommandType.CONSUME_RESULT:
return self._consume_results(request_id=request_id, payload=payload)
if command_type is CommandType.LATEST_FRAME:
return self._handle_latest_frame(request_id=request_id)
raise NotImplementedError(
f"Command type `{command_type}` cannot be handled"
)
Expand Down Expand Up @@ -636,6 +639,56 @@ def _consume_results(self, request_id: str, payload: dict) -> None:
error_type=ErrorType.OPERATION_ERROR,
)

def _handle_latest_frame(self, request_id: str) -> None:
try:
if self._buffer_sink is None or self._buffer_sink.empty():
response_payload = {
STATUS_KEY: OperationStatus.SUCCESS,
"frame_data": None,
"frame_id": None,
"frame_timestamp": None,
"source_id": None,
}
self._responses_queue.put((request_id, response_payload))
return None
# Peek at the last item in the buffer (non-destructive)
predictions, frames = self._buffer_sink._buffer[-1]
# Find the last non-None VideoFrame
frame = None
for f in reversed(frames):
if f is not None:
frame = f
break
if frame is None:
response_payload = {
STATUS_KEY: OperationStatus.SUCCESS,
"frame_data": None,
"frame_id": None,
"frame_timestamp": None,
"source_id": None,
}
self._responses_queue.put((request_id, response_payload))
return None
_, jpeg_bytes = cv.imencode(
".jpg", frame.image, [cv.IMWRITE_JPEG_QUALITY, 70]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we like to allow to parametrize this through the request? Not sure about it's usefulness at this moment because I don't know the full context, but just wanted to point this out. Although in that case I would provide some enum with some reasonable values, low, medium, high where medium is 70 for example. Otherwise people would probably skew to typing 100 all the time.

)
frame_b64 = base64.b64encode(jpeg_bytes.tobytes()).decode("ascii")
response_payload = {
STATUS_KEY: OperationStatus.SUCCESS,
"frame_data": frame_b64,
"frame_id": frame.frame_id,
"frame_timestamp": frame.frame_timestamp.isoformat(),
"source_id": frame.source_id,
}
self._responses_queue.put((request_id, response_payload))
except Exception as error:
self._handle_error(
request_id=request_id,
error=error,
public_error_message="Unexpected error retrieving latest frame.",
error_type=ErrorType.OPERATION_ERROR,
)

def _handle_error(
self,
request_id: str,
Expand Down
Loading
Loading