diff --git a/apps/worker/helpers/checkpoint_logger/flows.py b/apps/worker/helpers/checkpoint_logger/flows.py index 9d4b78ec7a..a581d8cb26 100644 --- a/apps/worker/helpers/checkpoint_logger/flows.py +++ b/apps/worker/helpers/checkpoint_logger/flows.py @@ -75,6 +75,50 @@ class UploadFlow(BaseFlow): FINAL = auto() +@failure_events( + "TOO_MANY_RETRIES", + "PROCESSOR_LOCK_ERROR", + "NOTIFY_LOCK_ERROR", + "NOTIFY_ALL_ERRORED", + "UNCAUGHT_RETRY_EXCEPTION", + "CELERY_FAILURE", + "CELERY_TIMEOUT", +) +@success_events( + "NOTIFIED", + "SKIPPING_NOTIFICATION", +) +@subflows( + ( + "ba_processing_duration", + "BUNDLE_ANALYSIS_BEGIN", + "PROCESSING_COMPLETE", + ), + ( + "ba_notification_latency", + "BUNDLE_ANALYSIS_BEGIN", + "NOTIFIED", + ), +) +@reliability_counters +class BundleAnalysisFlow(BaseFlow): + BUNDLE_ANALYSIS_BEGIN = auto() + PROCESSING_BEGIN = auto() + PROCESSING_COMPLETE = auto() + NOTIFIED = auto() + SKIPPING_NOTIFICATION = auto() + TOO_MANY_RETRIES = auto() + PROCESSOR_LOCK_ERROR = auto() + NOTIFY_LOCK_ERROR = auto() + NOTIFY_ALL_ERRORED = auto() + UNCAUGHT_RETRY_EXCEPTION = auto() + CELERY_FAILURE = auto() + CELERY_TIMEOUT = auto() + + # Sentinel checkpoint - not directly used + FINAL = auto() + + @failure_events( "TEST_RESULTS_ERROR", "UNCAUGHT_RETRY_EXCEPTION", "CELERY_FAILURE", "CELERY_TIMEOUT" ) diff --git a/apps/worker/tasks/base.py b/apps/worker/tasks/base.py index 9d5b7294b5..edcc980144 100644 --- a/apps/worker/tasks/base.py +++ b/apps/worker/tasks/base.py @@ -25,7 +25,11 @@ Repository, ) from helpers.checkpoint_logger import from_kwargs as load_checkpoints_from_kwargs -from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow +from helpers.checkpoint_logger.flows import ( + BundleAnalysisFlow, + TestResultsFlow, + UploadFlow, +) from helpers.clock import get_seconds_to_next_hour from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError from helpers.log_context import LogContext, set_log_context @@ -90,6 +94,8 @@ def on_timeout(self, soft: bool, timeout: int): if UploadFlow.has_begun(): UploadFlow.log(UploadFlow.CELERY_TIMEOUT) + if BundleAnalysisFlow.has_begun(): + BundleAnalysisFlow.log(BundleAnalysisFlow.CELERY_TIMEOUT) if TestResultsFlow.has_begun(): TestResultsFlow.log(TestResultsFlow.CELERY_TIMEOUT) @@ -359,6 +365,8 @@ def safe_retry(self, countdown=None, exc=None, **kwargs): TASK_MAX_RETRIES_EXCEEDED_COUNTER.labels(task=self.name).inc() if UploadFlow.has_begun(): UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) + if BundleAnalysisFlow.has_begun(): + BundleAnalysisFlow.log(BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION) if TestResultsFlow.has_begun(): TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) sentry_sdk.capture_exception( @@ -428,7 +436,9 @@ def run(self, *args, **kwargs): log_context.populate_from_sqlalchemy(db_session) set_log_context(log_context) - load_checkpoints_from_kwargs([UploadFlow, TestResultsFlow], kwargs) + load_checkpoints_from_kwargs( + [UploadFlow, BundleAnalysisFlow, TestResultsFlow], kwargs + ) self.task_run_counter.inc() if ( @@ -476,6 +486,10 @@ def run(self, *args, **kwargs): except MaxRetriesExceededError: if UploadFlow.has_begun(): UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) + if BundleAnalysisFlow.has_begun(): + BundleAnalysisFlow.log( + BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION + ) if TestResultsFlow.has_begun(): TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) return None @@ -490,12 +504,18 @@ def run(self, *args, **kwargs): except MaxRetriesExceededError: if UploadFlow.has_begun(): UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) + if BundleAnalysisFlow.has_begun(): + BundleAnalysisFlow.log( + BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION + ) if TestResultsFlow.has_begun(): TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) return None - except MaxRetriesExceededError as ex: + except MaxRetriesExceededError: if UploadFlow.has_begun(): UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION) + if BundleAnalysisFlow.has_begun(): + BundleAnalysisFlow.log(BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION) if TestResultsFlow.has_begun(): TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION) finally: @@ -556,6 +576,8 @@ def on_failure(self, exc, task_id, args, kwargs, einfo): if UploadFlow.has_begun(): UploadFlow.log(UploadFlow.CELERY_FAILURE) + if BundleAnalysisFlow.has_begun(): + BundleAnalysisFlow.log(BundleAnalysisFlow.CELERY_FAILURE) if TestResultsFlow.has_begun(): TestResultsFlow.log(TestResultsFlow.CELERY_FAILURE) @@ -719,6 +741,8 @@ def _call_upload_breadcrumb_task( upload_ids: list[int] = [], error: Errors | None = None, error_text: str | None = None, + task_name: str | None = None, + parent_task_id: str | None = None, ): """ Queue a task to create an upload breadcrumb. @@ -729,7 +753,11 @@ def _call_upload_breadcrumb_task( "commit_sha": commit_sha, "repo_id": repo_id, "breadcrumb_data": BreadcrumbData( - milestone=milestone, error=error, error_text=error_text + milestone=milestone, + error=error, + error_text=error_text, + task_name=task_name, + parent_task_id=parent_task_id, ), "upload_ids": upload_ids, "sentry_trace_id": current_sentry_trace_id(), diff --git a/libs/shared/shared/django_apps/upload_breadcrumbs/models.py b/libs/shared/shared/django_apps/upload_breadcrumbs/models.py index d427ad7bb7..6281527947 100644 --- a/libs/shared/shared/django_apps/upload_breadcrumbs/models.py +++ b/libs/shared/shared/django_apps/upload_breadcrumbs/models.py @@ -49,6 +49,9 @@ class Milestones(models.TextChoices): NOTIFICATIONS_TRIGGERED = "nt", _("Notifications triggered") UPLOAD_COMPLETE = "uc", _("Upload complete") NOTIFICATIONS_SENT = "ns", _("Notifications sent") + LOCK_ACQUIRING = "la", _("Acquiring lock") + LOCK_ACQUIRED = "lac", _("Lock acquired") + LOCK_RELEASED = "lr", _("Lock released") class Endpoints(models.TextChoices): @@ -157,6 +160,8 @@ class BreadcrumbData( uploader: str | None = None error: Errors | None = None error_text: str | None = None + task_name: str | None = None + parent_task_id: str | None = None @field_validator("*", mode="after") @classmethod