Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 219 additions & 3 deletions lisa/notifiers/perfevaluation/perfevaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Type, cast
from typing import Any, Dict, List, Optional, Tuple, Type, cast

import yaml
from dataclasses_json import dataclass_json
Expand All @@ -23,6 +23,8 @@ class PerfEvaluationSchema(schema.Notifier):
output_file: Optional[str] = None
statistics_times: Optional[int] = None
fail_test_on_performance_failure: bool = False
# Allowed post-peak throughput drop ratio, e.g. 0.02 means 2% drop allowed.
post_peak_drop_ratio: float = 0.02
Comment on lines +26 to +27
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description template is still empty (Description/Related Issue/Type of Change/Checklist). Please fill it in so reviewers understand the intent and validation plan for the new PerfEvaluation behavior.

Copilot uses AI. Check for mistakes.


@dataclass
Expand Down Expand Up @@ -199,6 +201,14 @@ def __init__(self, runbook: schema.TypedSchema) -> None:
self._failed_metrics: Dict[str, List[Dict[str, Any]]] = {}
self._pending_messages: Dict[Any, List[messages.UnifiedPerfMessage]] = {}
self._perf_runs_cache: Dict[Any, List[messages.UnifiedPerfMessage]] = {}
self._az_cap_cache: Dict[str, Any] = {}
self._post_peak_drop_ratio: float = 0.02
# key: (test_case_name, vmsize, base_metric_name)
# base_metric_name = metric name with _conn_N suffix stripped
self._bw_peak_tracker: Dict[
Tuple[str, str, str],
Dict[str, Any],
] = {}
plugin_manager.register(self)

@classmethod
Expand All @@ -211,6 +221,11 @@ def type_schema(cls) -> Type[schema.TypedSchema]:

def _initialize(self, *args: Any, **kwargs: Any) -> None:
runbook = cast(PerfEvaluationSchema, self.runbook)
# Clamp to [0, 1) to avoid invalid values breaking the threshold formula.
self._post_peak_drop_ratio = min(
max(runbook.post_peak_drop_ratio, 0.0),
0.999,
)
if runbook.criteria:
self._load_criteria_from_dict(runbook.criteria)
elif runbook.criteria_file:
Expand Down Expand Up @@ -756,6 +771,7 @@ def _evaluate_pending_messages_for_test(self, test_case_name: str) -> None:
message_key, messages_list, statistics_type
)
del self._pending_messages[message_key]
self._finalize_bw_peak_checks(test_case_name)

def _aggregate_and_evaluate(
self,
Expand Down Expand Up @@ -821,6 +837,26 @@ def _evaluate_performance_message(
f"[PerfEvaluate] Evaluating {test_case_name}.{metric_name} "
f"value={actual_value} (VM: {vm_size})"
)
# Step 1: if this is a throughput metric and SKU bandwidth is available,
# use the peak-based check exclusively — skip perf_criteria entirely.
throughput_prefixes = (
"throughput_in_gbps",
"tx_throughput_in_gbps",
"rx_throughput_in_gbps",
)
if (
any(metric_name.startswith(p) for p in throughput_prefixes)
and perf_message.metric_relativity
== messages.MetricRelativity.HigherIsBetter
):
bw_spec = self._lookup_az_max_bandwidth(
perf_message.location, perf_message.vmsize or ""
)
if bw_spec is not None:
self._update_bw_peak(perf_message, metric_name, actual_value, bw_spec)
return

Comment on lines +840 to +858
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SKU-based throughput path returns early after _update_bw_peak(...) and does not record a passing evaluation result. If all perf metrics in a run are throughput metrics and they pass, _evaluation_results can remain empty and finalize() will log "No performance evaluations performed." Consider appending a pass result per throughput metric (and/or for the SKU 90% check) so reports/JSON output remain consistent with the non-SKU criteria path.

Copilot uses AI. Check for mistakes.
# Step 2: fall back to perf_criteria.yml
metric_criteria = self._get_criteria_for_test(
test_case_name, vm_size, metric_name
)
Expand Down Expand Up @@ -893,6 +929,180 @@ def _evaluate_performance_message(
self._log.debug(f"{msg} {unit_info} (VM: {vm_size}, value: {actual_value})")
self._evaluation_results.append(evaluation_result)

def _base_metric_name(self, metric_name: str) -> str:
"""Strip _conn_N (and optional _buffer_N) suffix to get the base metric."""
return re.sub(r"_conn_\d+(_buffer_\d+)?$", "", metric_name)

def _update_bw_peak(
self,
perf_message: messages.UnifiedPerfMessage,
metric_name: str,
value: float,
sku_bw: float,
) -> None:
"""Track throughput per connection and enforce two rules:

1. Post-peak 2% rule: once the running maximum starts declining,
every subsequent value must be >= peak * (1 - drop).
2. SKU 90% rule (checked at finalize): the overall peak must reach
at least 90% of the SKU's MaxNetworkBandwidthGbps.
"""
test_case_name = perf_message.test_case_name
vm_size = perf_message.vmsize or "unknown"
base = self._base_metric_name(metric_name)
tracker_key: Tuple[str, str, str] = (test_case_name, vm_size, base)

if tracker_key not in self._bw_peak_tracker:
self._bw_peak_tracker[tracker_key] = {
"sku_bw": sku_bw,
"peak_value": 0.0,
"peak_passed": False, # True once a value drops below current peak
"last_msg": perf_message,
}
state = self._bw_peak_tracker[tracker_key]
state["last_msg"] = perf_message

if value >= state["peak_value"]:
# Still climbing (or equal) — update the running max
state["peak_value"] = value
else:
# Value dropped below current peak — we are past the maximum
state["peak_passed"] = True

if state["peak_passed"]:
peak = state["peak_value"]
min_allowed = peak * (1 - self._post_peak_drop_ratio)
if value < min_allowed:
drop_pct = (1 - value / peak) * 100
allowed_drop_pct = self._post_peak_drop_ratio * 100
eval_msg = (
f"\u2717 {metric_name}: {value:.3f} Gbps dropped "
f"{drop_pct:.1f}% from peak {peak:.3f} Gbps "
f"(allowed: within {allowed_drop_pct:.1f}%)"
)
self._log.info(
f"[BwPeak] Drop detected {test_case_name}.{metric_name} "
f"(VM: {vm_size}): {eval_msg}"
)
result: Dict[str, Any] = {
"timestamp": (str(perf_message.time) if perf_message.time else ""),
"test_case_name": test_case_name,
"metric_name": metric_name,
"metric_value": value,
"metric_unit": perf_message.metric_unit,
"metric_relativity": (
perf_message.metric_relativity.value
if perf_message.metric_relativity
else "NA"
),
"tool": perf_message.tool,
"platform": perf_message.platform,
"vmsize": perf_message.vmsize,
"role": perf_message.role,
"criteria_defined": True,
"criteria_met": False,
"evaluation_message": eval_msg,
}
self._evaluation_results.append(result)
failed: Dict[str, Any] = {
"metric_name": metric_name,
"actual_value": value,
"unit": perf_message.metric_unit,
"evaluation_message": eval_msg,
"vm_size": vm_size,
}
self._failed_metrics.setdefault(test_case_name, []).append(failed)

def _finalize_bw_peak_checks(self, test_case_name: str) -> None:
"""Check SKU 90% rule and clear tracker state."""
keys_to_remove: List[Tuple[str, str, str]] = []
for tracker_key, state in self._bw_peak_tracker.items():
tc, vm_size, base_metric = tracker_key
if tc != test_case_name:
continue
peak = state["peak_value"]
sku_bw = state["sku_bw"]
required = sku_bw * 0.9
if peak < required:
eval_msg = (
f"\u2717 {base_metric}: peak {peak:.3f} Gbps did not reach "
f"90% of SKU max {sku_bw:.3f} Gbps "
f"(required >= {required:.3f} Gbps)"
)
self._log.info(
f"[BwPeak] SKU 90% check failed for "
f"{tc}.{base_metric} (VM: {vm_size}): {eval_msg}"
)
last_msg = state["last_msg"]
result: Dict[str, Any] = {
"timestamp": (str(last_msg.time) if last_msg.time else ""),
"test_case_name": tc,
"metric_name": base_metric,
"metric_value": peak,
"metric_unit": last_msg.metric_unit,
"metric_relativity": (
last_msg.metric_relativity.value
if last_msg.metric_relativity
else "NA"
),
"tool": last_msg.tool,
"platform": last_msg.platform,
"vmsize": last_msg.vmsize,
"role": last_msg.role,
"criteria_defined": True,
"criteria_met": False,
"evaluation_message": eval_msg,
}
self._evaluation_results.append(result)
failed: Dict[str, Any] = {
"metric_name": base_metric,
"actual_value": peak,
"unit": last_msg.metric_unit,
"evaluation_message": eval_msg,
"vm_size": vm_size,
}
self._failed_metrics.setdefault(tc, []).append(failed)
else:
self._log.info(
f"[BwPeak] SKU 90% check passed for "
f"{tc}.{base_metric} (VM: {vm_size}): "
f"peak {peak:.3f} >= {required:.3f} Gbps"
)
keys_to_remove.append(tracker_key)

for tracker_key in keys_to_remove:
del self._bw_peak_tracker[tracker_key]

def _lookup_az_max_bandwidth(self, location: str, vmsize: str) -> Optional[float]:
"""Read MaxNetworkBandwidthGbps from the Azure SKU cache file.

Results are cached in memory so the file is only read once per location.
Nothing is sent as a message, so nothing ends up in the database.
"""
if not location or not vmsize:
return None
if location not in self._az_cap_cache:
cache_path = constants.CACHE_PATH / f"azure_locations_{location}.json"
if not cache_path.exists():
self._az_cap_cache[location] = {}
else:
try:
with open(cache_path, "r", encoding="utf-8") as f:
self._az_cap_cache[location] = json.load(f)
except Exception:
Copy link

Copilot AI Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_lookup_az_max_bandwidth catches Exception and silently falls back to {} on any failure reading/parsing the cache file. This can mask real issues (e.g., JSON corruption) and make troubleshooting difficult. Prefer catching specific exceptions (e.g., OSError, json.JSONDecodeError, ValueError) and logging the exception (at least debug) with the cache path before falling back.

Suggested change
except Exception:
except (OSError, json.JSONDecodeError, ValueError) as e:
self._log.debug(
f"Failed to read Azure capability cache from {cache_path}: "
f"{e}. Verify the cache file exists, is readable, and "
"contains valid JSON. Falling back to an empty cache."
)

Copilot uses AI. Check for mistakes.
self._az_cap_cache[location] = {}
caps = (
self._az_cap_cache[location]
.get("capabilities", {})
.get(vmsize, {})
.get("resource_sku", {})
.get("capabilities", [])
)
for cap in caps:
if cap.get("name") == "MaxNetworkBandwidthGbps":
return float(cap["value"])
return None

def finalize(self) -> None:
if not self._evaluation_results:
self._log.info("No performance evaluations performed.")
Expand Down Expand Up @@ -967,9 +1177,15 @@ def _modify_message(self, message: messages.MessageBase) -> None:
for failed_key in self._failed_metrics.keys():
if failed_key in message.full_name or failed_key in message.name:
failed_metrics = self._failed_metrics[failed_key]
detail_parts: List[str] = []
for item in failed_metrics:
metric = item.get("metric_name", "unknown_metric")
reason = item.get("evaluation_message", "no reason")
detail_parts.append(f"{metric}: {reason}")
detail_msg = "; ".join(detail_parts)
summary_msg = (
f"Performance evaluation failed: {len(failed_metrics)} "
f"metric(s) did not meet criteria"
f"metric(s) did not meet criteria. Details: {detail_msg}"
)
message.perf_evaluation_summary = {
"failed_metrics_count": len(failed_metrics),
Expand All @@ -984,7 +1200,7 @@ def _modify_message(self, message: messages.MessageBase) -> None:
original_message = message.message or ""
perf_summary = (
f"Performance criteria failed: {len(failed_metrics)} "
f"metrics"
f"metrics. {detail_msg}"
)
message.status = TestStatus.FAILED
if original_message:
Expand Down
Loading