Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions apps/worker/helpers/checkpoint_logger/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,50 @@ class UploadFlow(BaseFlow):
FINAL = auto()


@failure_events(
"TOO_MANY_RETRIES",
"PROCESSOR_LOCK_ERROR",
"NOTIFY_LOCK_ERROR",
"NOTIFY_ALL_ERRORED",
"UNCAUGHT_RETRY_EXCEPTION",
"CELERY_FAILURE",
"CELERY_TIMEOUT",
)
@success_events(
"NOTIFIED",
"SKIPPING_NOTIFICATION",
)
@subflows(
(
"ba_processing_duration",
"BUNDLE_ANALYSIS_BEGIN",
"PROCESSING_COMPLETE",
),
(
"ba_notification_latency",
"BUNDLE_ANALYSIS_BEGIN",
"NOTIFIED",
),
)
@reliability_counters
class BundleAnalysisFlow(BaseFlow):
BUNDLE_ANALYSIS_BEGIN = auto()
PROCESSING_BEGIN = auto()
PROCESSING_COMPLETE = auto()
NOTIFIED = auto()
SKIPPING_NOTIFICATION = auto()
TOO_MANY_RETRIES = auto()
PROCESSOR_LOCK_ERROR = auto()
NOTIFY_LOCK_ERROR = auto()
NOTIFY_ALL_ERRORED = auto()
UNCAUGHT_RETRY_EXCEPTION = auto()
CELERY_FAILURE = auto()
CELERY_TIMEOUT = auto()

# Sentinel checkpoint - not directly used
FINAL = auto()


@failure_events(
"TEST_RESULTS_ERROR", "UNCAUGHT_RETRY_EXCEPTION", "CELERY_FAILURE", "CELERY_TIMEOUT"
)
Expand Down
36 changes: 32 additions & 4 deletions apps/worker/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
Repository,
)
from helpers.checkpoint_logger import from_kwargs as load_checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow
from helpers.checkpoint_logger.flows import (
BundleAnalysisFlow,
TestResultsFlow,
UploadFlow,
)
from helpers.clock import get_seconds_to_next_hour
from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError
from helpers.log_context import LogContext, set_log_context
Expand Down Expand Up @@ -90,6 +94,8 @@ def on_timeout(self, soft: bool, timeout: int):

if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.CELERY_TIMEOUT)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(BundleAnalysisFlow.CELERY_TIMEOUT)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.CELERY_TIMEOUT)

Expand Down Expand Up @@ -359,6 +365,8 @@ def safe_retry(self, countdown=None, exc=None, **kwargs):
TASK_MAX_RETRIES_EXCEEDED_COUNTER.labels(task=self.name).inc()
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
sentry_sdk.capture_exception(
Expand Down Expand Up @@ -428,7 +436,9 @@ def run(self, *args, **kwargs):

log_context.populate_from_sqlalchemy(db_session)
set_log_context(log_context)
load_checkpoints_from_kwargs([UploadFlow, TestResultsFlow], kwargs)
load_checkpoints_from_kwargs(
[UploadFlow, BundleAnalysisFlow, TestResultsFlow], kwargs
)

self.task_run_counter.inc()
if (
Expand Down Expand Up @@ -476,6 +486,10 @@ def run(self, *args, **kwargs):
except MaxRetriesExceededError:
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(
BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION
)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
return None
Expand All @@ -490,12 +504,18 @@ def run(self, *args, **kwargs):
except MaxRetriesExceededError:
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(
BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION
)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
return None
except MaxRetriesExceededError as ex:
except MaxRetriesExceededError:
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
finally:
Expand Down Expand Up @@ -556,6 +576,8 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):

if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.CELERY_FAILURE)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(BundleAnalysisFlow.CELERY_FAILURE)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.CELERY_FAILURE)

Expand Down Expand Up @@ -719,6 +741,8 @@ def _call_upload_breadcrumb_task(
upload_ids: list[int] = [],
error: Errors | None = None,
error_text: str | None = None,
task_name: str | None = None,
parent_task_id: str | None = None,
):
"""
Queue a task to create an upload breadcrumb.
Expand All @@ -729,7 +753,11 @@ def _call_upload_breadcrumb_task(
"commit_sha": commit_sha,
"repo_id": repo_id,
"breadcrumb_data": BreadcrumbData(
milestone=milestone, error=error, error_text=error_text
milestone=milestone,
error=error,
error_text=error_text,
task_name=task_name,
parent_task_id=parent_task_id,
),
"upload_ids": upload_ids,
"sentry_trace_id": current_sentry_trace_id(),
Expand Down
5 changes: 5 additions & 0 deletions libs/shared/shared/django_apps/upload_breadcrumbs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class Milestones(models.TextChoices):
NOTIFICATIONS_TRIGGERED = "nt", _("Notifications triggered")
UPLOAD_COMPLETE = "uc", _("Upload complete")
NOTIFICATIONS_SENT = "ns", _("Notifications sent")
LOCK_ACQUIRING = "la", _("Acquiring lock")
LOCK_ACQUIRED = "lac", _("Lock acquired")
LOCK_RELEASED = "lr", _("Lock released")


class Endpoints(models.TextChoices):
Expand Down Expand Up @@ -157,6 +160,8 @@ class BreadcrumbData(
uploader: str | None = None
error: Errors | None = None
error_text: str | None = None
task_name: str | None = None
parent_task_id: str | None = None

@field_validator("*", mode="after")
@classmethod
Expand Down
Loading