Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions gateway/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,11 @@
# Controls PLR0913
max-args = 9

[tool.ruff.lint.per-file-ignores]
"sds_gateway/api_methods/tests/test_composite_capture_serialization.py" = [
"PLR2004",
]

[tool.ruff.format]
indent-style = "space"
line-ending = "auto"
Expand Down
25 changes: 3 additions & 22 deletions gateway/sds_gateway/api_methods/helpers/search_captures.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
)
from sds_gateway.api_methods.utils.metadata_schemas import infer_index_name
from sds_gateway.api_methods.utils.opensearch_client import get_opensearch_client
from sds_gateway.api_methods.utils.relationship_utils import (
group_captures_by_top_level_dir,
)
from sds_gateway.users.models import User

RangeValue = dict[str, int | float]
Expand Down Expand Up @@ -341,27 +344,6 @@ def _build_os_query_for_captures(
return query


def group_captures_by_top_level_dir(
captures: QuerySet[Capture],
) -> dict[str, list[Capture]]:
"""Group captures by top_level_dir for composite capture handling.

Args:
captures: QuerySet of Capture objects
Returns:
dict: {top_level_dir: list of captures}
"""
grouped_captures: dict[str, list[Capture]] = {}

for capture in captures:
top_level_dir = capture.top_level_dir
if top_level_dir not in grouped_captures:
grouped_captures[top_level_dir] = []
grouped_captures[top_level_dir].append(capture)

return grouped_captures


# TODO: add pagination before retrieval rather than after
# Need to paginate/limit OpenSearch results list before grouping
# and then paginate/limit the grouped captures
Expand All @@ -376,7 +358,6 @@ def get_composite_captures(
Returns:
list: List of composite capture data
"""

grouped_captures = group_captures_by_top_level_dir(captures)
composite_captures = []

Expand Down
66 changes: 35 additions & 31 deletions gateway/sds_gateway/api_methods/helpers/temporal_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,17 @@ def drf_rf_filename_from_ms(ms: int) -> str:
return f"rf@{ms // 1000}.{ms % 1000:03d}.h5"


def _catch_capture_type_error(capture_type: CaptureType) -> None:
def _catch_value_errors(capture_type: CaptureType, capture: Capture) -> None:

if capture_type != CaptureType.DigitalRF:
msg = "Only DigitalRF captures are supported for temporal filtering."
log.error(msg)
raise ValueError(msg)


def _filter_capture_data_files_selection_bounds(
capture: Capture,
start_time: int, # relative ms from start of capture (from UI)
end_time: int, # relative ms from start of capture (from UI)
) -> QuerySet[File]:
"""Filter the capture file selection bounds to the given start and end times."""
if capture.start_time is None:
msg = f"Capture {capture.uuid} has no indexed start_time for temporal filtering"
raise ValueError(msg)

epoch_start_ms = capture.start_time * 1000
start_ms = epoch_start_ms + start_time
end_ms = epoch_start_ms + end_time

start_file_name = drf_rf_filename_from_ms(start_ms)
end_file_name = drf_rf_filename_from_ms(end_ms)

data_files = capture.get_drf_data_files_queryset()
return data_files.filter(
name__gte=start_file_name,
name__lte=end_file_name,
).order_by("name")


def get_capture_files_with_temporal_filter(
capture_type: CaptureType,
Expand All @@ -60,24 +41,47 @@ def get_capture_files_with_temporal_filter(
end_time: int | None = None,
) -> QuerySet[File]:
"""Get the capture files with temporal filtering."""
_catch_capture_type_error(capture_type)
_catch_value_errors(capture_type, capture)

capture_files = get_capture_files(capture)

if start_time is None or end_time is None:
log.warning(
"Start or end time is None; returning all capture files without "
"temporal filtering"
)
return get_capture_files(capture)
return capture_files

# get non-data files
non_data_files = get_capture_files(capture).exclude(
name__regex=DRF_RF_FILENAME_REGEX_STR
)
epoch_start_ms = capture.start_time * 1000
start_ms = epoch_start_ms + start_time
end_ms = epoch_start_ms + end_time

# get data files with temporal filtering
data_files = _filter_capture_data_files_selection_bounds(
capture, start_time, end_time
return filter_files_by_temporal_bounds(
capture_files,
start_ms,
end_ms,
)


def filter_files_by_temporal_bounds(
files: QuerySet[File],
start_time: int,
end_time: int,
) -> QuerySet[File]:
"""Filter files by temporal bounds."""

# get non-data files
non_data_files = files.exclude(name__regex=DRF_RF_FILENAME_REGEX_STR)

unfiltered_data_files = files.filter(name__regex=DRF_RF_FILENAME_REGEX_STR)

start_file_name = drf_rf_filename_from_ms(start_time)
end_file_name = drf_rf_filename_from_ms(end_time)

filtered_data_files = unfiltered_data_files.filter(
name__gte=start_file_name,
name__lte=end_file_name,
).order_by("name")

# return all files
return non_data_files.union(data_files)
return non_data_files.union(filtered_data_files)
Loading
Loading