Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions apps/worker/helpers/checkpoint_logger/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,50 @@ class UploadFlow(BaseFlow):
FINAL = auto()


@failure_events(
"TOO_MANY_RETRIES",
"PROCESSOR_LOCK_ERROR",
"NOTIFY_LOCK_ERROR",
"NOTIFY_ALL_ERRORED",
"UNCAUGHT_RETRY_EXCEPTION",
"CELERY_FAILURE",
"CELERY_TIMEOUT",
)
@success_events(
"NOTIFIED",
"SKIPPING_NOTIFICATION",
)
@subflows(
(
"ba_processing_duration",
"BUNDLE_ANALYSIS_BEGIN",
"PROCESSING_COMPLETE",
),
(
"ba_notification_latency",
"BUNDLE_ANALYSIS_BEGIN",
"NOTIFIED",
),
)
@reliability_counters
class BundleAnalysisFlow(BaseFlow):
BUNDLE_ANALYSIS_BEGIN = auto()
PROCESSING_BEGIN = auto()
PROCESSING_COMPLETE = auto()
NOTIFIED = auto()
SKIPPING_NOTIFICATION = auto()
TOO_MANY_RETRIES = auto()
PROCESSOR_LOCK_ERROR = auto()
NOTIFY_LOCK_ERROR = auto()
NOTIFY_ALL_ERRORED = auto()
UNCAUGHT_RETRY_EXCEPTION = auto()
CELERY_FAILURE = auto()
CELERY_TIMEOUT = auto()

# Sentinel checkpoint - not directly used
FINAL = auto()


@failure_events(
"TEST_RESULTS_ERROR", "UNCAUGHT_RETRY_EXCEPTION", "CELERY_FAILURE", "CELERY_TIMEOUT"
)
Expand Down
28 changes: 25 additions & 3 deletions apps/worker/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
Repository,
)
from helpers.checkpoint_logger import from_kwargs as load_checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow
from helpers.checkpoint_logger.flows import (
BundleAnalysisFlow,
TestResultsFlow,
UploadFlow,
)
from helpers.clock import get_seconds_to_next_hour
from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError
from helpers.log_context import LogContext, set_log_context
Expand Down Expand Up @@ -90,6 +94,8 @@ def on_timeout(self, soft: bool, timeout: int):

if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.CELERY_TIMEOUT)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(BundleAnalysisFlow.CELERY_TIMEOUT)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.CELERY_TIMEOUT)

Expand Down Expand Up @@ -359,6 +365,8 @@ def safe_retry(self, countdown=None, exc=None, **kwargs):
TASK_MAX_RETRIES_EXCEEDED_COUNTER.labels(task=self.name).inc()
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
sentry_sdk.capture_exception(
Expand Down Expand Up @@ -428,7 +436,9 @@ def run(self, *args, **kwargs):

log_context.populate_from_sqlalchemy(db_session)
set_log_context(log_context)
load_checkpoints_from_kwargs([UploadFlow, TestResultsFlow], kwargs)
load_checkpoints_from_kwargs(
[UploadFlow, BundleAnalysisFlow, TestResultsFlow], kwargs
)

self.task_run_counter.inc()
if (
Expand Down Expand Up @@ -476,6 +486,10 @@ def run(self, *args, **kwargs):
except MaxRetriesExceededError:
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(
BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION
)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
return None
Expand All @@ -490,12 +504,18 @@ def run(self, *args, **kwargs):
except MaxRetriesExceededError:
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(
BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION
)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
return None
except MaxRetriesExceededError as ex:
except MaxRetriesExceededError:
if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.UNCAUGHT_RETRY_EXCEPTION)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(BundleAnalysisFlow.UNCAUGHT_RETRY_EXCEPTION)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.UNCAUGHT_RETRY_EXCEPTION)
finally:
Expand Down Expand Up @@ -556,6 +576,8 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):

if UploadFlow.has_begun():
UploadFlow.log(UploadFlow.CELERY_FAILURE)
if BundleAnalysisFlow.has_begun():
BundleAnalysisFlow.log(BundleAnalysisFlow.CELERY_FAILURE)
if TestResultsFlow.has_begun():
TestResultsFlow.log(TestResultsFlow.CELERY_FAILURE)

Expand Down
Loading