diff --git a/lisa/notifiers/perfevaluation/perfevaluation.py b/lisa/notifiers/perfevaluation/perfevaluation.py index 2d0fabb3c4..e760593120 100644 --- a/lisa/notifiers/perfevaluation/perfevaluation.py +++ b/lisa/notifiers/perfevaluation/perfevaluation.py @@ -4,7 +4,7 @@ import uuid from dataclasses import dataclass from pathlib import Path -from typing import Any, Dict, List, Optional, Type, cast +from typing import Any, Dict, List, Optional, Tuple, Type, cast import yaml from dataclasses_json import dataclass_json @@ -23,6 +23,8 @@ class PerfEvaluationSchema(schema.Notifier): output_file: Optional[str] = None statistics_times: Optional[int] = None fail_test_on_performance_failure: bool = False + # Allowed post-peak throughput drop ratio, e.g. 0.02 means 2% drop allowed. + post_peak_drop_ratio: float = 0.02 @dataclass @@ -199,6 +201,14 @@ def __init__(self, runbook: schema.TypedSchema) -> None: self._failed_metrics: Dict[str, List[Dict[str, Any]]] = {} self._pending_messages: Dict[Any, List[messages.UnifiedPerfMessage]] = {} self._perf_runs_cache: Dict[Any, List[messages.UnifiedPerfMessage]] = {} + self._az_cap_cache: Dict[str, Any] = {} + self._post_peak_drop_ratio: float = 0.02 + # key: (test_case_name, vmsize, base_metric_name) + # base_metric_name = metric name with _conn_N suffix stripped + self._bw_peak_tracker: Dict[ + Tuple[str, str, str], + Dict[str, Any], + ] = {} plugin_manager.register(self) @classmethod @@ -211,6 +221,11 @@ def type_schema(cls) -> Type[schema.TypedSchema]: def _initialize(self, *args: Any, **kwargs: Any) -> None: runbook = cast(PerfEvaluationSchema, self.runbook) + # Clamp to [0, 1) to avoid invalid values breaking the threshold formula. + self._post_peak_drop_ratio = min( + max(runbook.post_peak_drop_ratio, 0.0), + 0.999, + ) if runbook.criteria: self._load_criteria_from_dict(runbook.criteria) elif runbook.criteria_file: @@ -756,6 +771,7 @@ def _evaluate_pending_messages_for_test(self, test_case_name: str) -> None: message_key, messages_list, statistics_type ) del self._pending_messages[message_key] + self._finalize_bw_peak_checks(test_case_name) def _aggregate_and_evaluate( self, @@ -821,6 +837,26 @@ def _evaluate_performance_message( f"[PerfEvaluate] Evaluating {test_case_name}.{metric_name} " f"value={actual_value} (VM: {vm_size})" ) + # Step 1: if this is a throughput metric and SKU bandwidth is available, + # use the peak-based check exclusively — skip perf_criteria entirely. + throughput_prefixes = ( + "throughput_in_gbps", + "tx_throughput_in_gbps", + "rx_throughput_in_gbps", + ) + if ( + any(metric_name.startswith(p) for p in throughput_prefixes) + and perf_message.metric_relativity + == messages.MetricRelativity.HigherIsBetter + ): + bw_spec = self._lookup_az_max_bandwidth( + perf_message.location, perf_message.vmsize or "" + ) + if bw_spec is not None: + self._update_bw_peak(perf_message, metric_name, actual_value, bw_spec) + return + + # Step 2: fall back to perf_criteria.yml metric_criteria = self._get_criteria_for_test( test_case_name, vm_size, metric_name ) @@ -893,6 +929,180 @@ def _evaluate_performance_message( self._log.debug(f"{msg} {unit_info} (VM: {vm_size}, value: {actual_value})") self._evaluation_results.append(evaluation_result) + def _base_metric_name(self, metric_name: str) -> str: + """Strip _conn_N (and optional _buffer_N) suffix to get the base metric.""" + return re.sub(r"_conn_\d+(_buffer_\d+)?$", "", metric_name) + + def _update_bw_peak( + self, + perf_message: messages.UnifiedPerfMessage, + metric_name: str, + value: float, + sku_bw: float, + ) -> None: + """Track throughput per connection and enforce two rules: + + 1. Post-peak 2% rule: once the running maximum starts declining, + every subsequent value must be >= peak * (1 - drop). + 2. SKU 90% rule (checked at finalize): the overall peak must reach + at least 90% of the SKU's MaxNetworkBandwidthGbps. + """ + test_case_name = perf_message.test_case_name + vm_size = perf_message.vmsize or "unknown" + base = self._base_metric_name(metric_name) + tracker_key: Tuple[str, str, str] = (test_case_name, vm_size, base) + + if tracker_key not in self._bw_peak_tracker: + self._bw_peak_tracker[tracker_key] = { + "sku_bw": sku_bw, + "peak_value": 0.0, + "peak_passed": False, # True once a value drops below current peak + "last_msg": perf_message, + } + state = self._bw_peak_tracker[tracker_key] + state["last_msg"] = perf_message + + if value >= state["peak_value"]: + # Still climbing (or equal) — update the running max + state["peak_value"] = value + else: + # Value dropped below current peak — we are past the maximum + state["peak_passed"] = True + + if state["peak_passed"]: + peak = state["peak_value"] + min_allowed = peak * (1 - self._post_peak_drop_ratio) + if value < min_allowed: + drop_pct = (1 - value / peak) * 100 + allowed_drop_pct = self._post_peak_drop_ratio * 100 + eval_msg = ( + f"\u2717 {metric_name}: {value:.3f} Gbps dropped " + f"{drop_pct:.1f}% from peak {peak:.3f} Gbps " + f"(allowed: within {allowed_drop_pct:.1f}%)" + ) + self._log.info( + f"[BwPeak] Drop detected {test_case_name}.{metric_name} " + f"(VM: {vm_size}): {eval_msg}" + ) + result: Dict[str, Any] = { + "timestamp": (str(perf_message.time) if perf_message.time else ""), + "test_case_name": test_case_name, + "metric_name": metric_name, + "metric_value": value, + "metric_unit": perf_message.metric_unit, + "metric_relativity": ( + perf_message.metric_relativity.value + if perf_message.metric_relativity + else "NA" + ), + "tool": perf_message.tool, + "platform": perf_message.platform, + "vmsize": perf_message.vmsize, + "role": perf_message.role, + "criteria_defined": True, + "criteria_met": False, + "evaluation_message": eval_msg, + } + self._evaluation_results.append(result) + failed: Dict[str, Any] = { + "metric_name": metric_name, + "actual_value": value, + "unit": perf_message.metric_unit, + "evaluation_message": eval_msg, + "vm_size": vm_size, + } + self._failed_metrics.setdefault(test_case_name, []).append(failed) + + def _finalize_bw_peak_checks(self, test_case_name: str) -> None: + """Check SKU 90% rule and clear tracker state.""" + keys_to_remove: List[Tuple[str, str, str]] = [] + for tracker_key, state in self._bw_peak_tracker.items(): + tc, vm_size, base_metric = tracker_key + if tc != test_case_name: + continue + peak = state["peak_value"] + sku_bw = state["sku_bw"] + required = sku_bw * 0.9 + if peak < required: + eval_msg = ( + f"\u2717 {base_metric}: peak {peak:.3f} Gbps did not reach " + f"90% of SKU max {sku_bw:.3f} Gbps " + f"(required >= {required:.3f} Gbps)" + ) + self._log.info( + f"[BwPeak] SKU 90% check failed for " + f"{tc}.{base_metric} (VM: {vm_size}): {eval_msg}" + ) + last_msg = state["last_msg"] + result: Dict[str, Any] = { + "timestamp": (str(last_msg.time) if last_msg.time else ""), + "test_case_name": tc, + "metric_name": base_metric, + "metric_value": peak, + "metric_unit": last_msg.metric_unit, + "metric_relativity": ( + last_msg.metric_relativity.value + if last_msg.metric_relativity + else "NA" + ), + "tool": last_msg.tool, + "platform": last_msg.platform, + "vmsize": last_msg.vmsize, + "role": last_msg.role, + "criteria_defined": True, + "criteria_met": False, + "evaluation_message": eval_msg, + } + self._evaluation_results.append(result) + failed: Dict[str, Any] = { + "metric_name": base_metric, + "actual_value": peak, + "unit": last_msg.metric_unit, + "evaluation_message": eval_msg, + "vm_size": vm_size, + } + self._failed_metrics.setdefault(tc, []).append(failed) + else: + self._log.info( + f"[BwPeak] SKU 90% check passed for " + f"{tc}.{base_metric} (VM: {vm_size}): " + f"peak {peak:.3f} >= {required:.3f} Gbps" + ) + keys_to_remove.append(tracker_key) + + for tracker_key in keys_to_remove: + del self._bw_peak_tracker[tracker_key] + + def _lookup_az_max_bandwidth(self, location: str, vmsize: str) -> Optional[float]: + """Read MaxNetworkBandwidthGbps from the Azure SKU cache file. + + Results are cached in memory so the file is only read once per location. + Nothing is sent as a message, so nothing ends up in the database. + """ + if not location or not vmsize: + return None + if location not in self._az_cap_cache: + cache_path = constants.CACHE_PATH / f"azure_locations_{location}.json" + if not cache_path.exists(): + self._az_cap_cache[location] = {} + else: + try: + with open(cache_path, "r", encoding="utf-8") as f: + self._az_cap_cache[location] = json.load(f) + except Exception: + self._az_cap_cache[location] = {} + caps = ( + self._az_cap_cache[location] + .get("capabilities", {}) + .get(vmsize, {}) + .get("resource_sku", {}) + .get("capabilities", []) + ) + for cap in caps: + if cap.get("name") == "MaxNetworkBandwidthGbps": + return float(cap["value"]) + return None + def finalize(self) -> None: if not self._evaluation_results: self._log.info("No performance evaluations performed.") @@ -967,9 +1177,15 @@ def _modify_message(self, message: messages.MessageBase) -> None: for failed_key in self._failed_metrics.keys(): if failed_key in message.full_name or failed_key in message.name: failed_metrics = self._failed_metrics[failed_key] + detail_parts: List[str] = [] + for item in failed_metrics: + metric = item.get("metric_name", "unknown_metric") + reason = item.get("evaluation_message", "no reason") + detail_parts.append(f"{metric}: {reason}") + detail_msg = "; ".join(detail_parts) summary_msg = ( f"Performance evaluation failed: {len(failed_metrics)} " - f"metric(s) did not meet criteria" + f"metric(s) did not meet criteria. Details: {detail_msg}" ) message.perf_evaluation_summary = { "failed_metrics_count": len(failed_metrics), @@ -984,7 +1200,7 @@ def _modify_message(self, message: messages.MessageBase) -> None: original_message = message.message or "" perf_summary = ( f"Performance criteria failed: {len(failed_metrics)} " - f"metrics" + f"metrics. {detail_msg}" ) message.status = TestStatus.FAILED if original_message: