From e7ff3d9c4d06df38ae55fc712815bf7953e35e8a Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 19 Feb 2026 22:26:01 +0900 Subject: [PATCH] feat: add lock lifecycle breadcrumbs and task tracking to coverage tasks Add lock lifecycle breadcrumbs (LOCK_ACQUIRING, LOCK_ACQUIRED, LOCK_RELEASED) around all lock acquisitions in upload_finisher and notify tasks. Add task_name and parent_task_id to all existing breadcrumb calls across upload_processor, upload_finisher, and notify tasks for pipeline traceability. upload_processor: - Add task_name/parent_task_id to all existing breadcrumb calls (no locks in this task) upload_finisher: - Lock lifecycle breadcrumbs around UPLOAD_PROCESSING lock - Lock lifecycle breadcrumbs around UPLOAD_FINISHER lock - task_name/parent_task_id on all breadcrumb calls notify: - Lock lifecycle breadcrumbs around NOTIFICATION lock - task_name/parent_task_id on all breadcrumb calls Co-authored-by: Cursor --- apps/worker/tasks/notify.py | 63 ++++++++++---- apps/worker/tasks/upload_finisher.py | 113 ++++++++++++++------------ apps/worker/tasks/upload_processor.py | 40 ++++----- 3 files changed, 122 insertions(+), 94 deletions(-) diff --git a/apps/worker/tasks/notify.py b/apps/worker/tasks/notify.py index 660d5caf55..92bbf911ca 100644 --- a/apps/worker/tasks/notify.py +++ b/apps/worker/tasks/notify.py @@ -80,6 +80,14 @@ def run_impl( ): milestone = Milestones.NOTIFICATIONS_SENT redis_connection = get_redis_connection() + + bc_kwargs = { + "commit_sha": commitid, + "repo_id": repoid, + "task_name": self.name, + "parent_task_id": self.request.parent_id, + } + if self.has_upcoming_notifies_according_to_redis( redis_connection, repoid, commitid ): @@ -89,10 +97,7 @@ def run_impl( ) self.log_checkpoint(UploadFlow.SKIPPING_NOTIFICATION) self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - error=Errors.INTERNAL_OTHER_JOB, + milestone=milestone, error=Errors.INTERNAL_OTHER_JOB, **bc_kwargs ) return { "notified": False, @@ -107,6 +112,10 @@ def run_impl( lock_timeout=max(80, self.hard_time_limit_task), ) + self._call_upload_breadcrumb_task( + milestone=Milestones.LOCK_ACQUIRING, **bc_kwargs + ) + try: lock_acquired = False with lock_manager.locked( @@ -114,7 +123,10 @@ def run_impl( retry_num=self.attempts, ): lock_acquired = True - return self.run_impl_within_lock( + self._call_upload_breadcrumb_task( + milestone=Milestones.LOCK_ACQUIRED, **bc_kwargs + ) + result = self.run_impl_within_lock( db_session, repoid=repoid, commitid=commitid, @@ -122,6 +134,10 @@ def run_impl( empty_upload=empty_upload, **kwargs, ) + self._call_upload_breadcrumb_task( + milestone=Milestones.LOCK_RELEASED, **bc_kwargs + ) + return result except LockRetry as err: ( log.info( @@ -136,16 +152,10 @@ def run_impl( ) self.log_checkpoint(UploadFlow.NOTIF_LOCK_ERROR) self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - error=Errors.INTERNAL_LOCK_ERROR, + milestone=milestone, error=Errors.INTERNAL_LOCK_ERROR, **bc_kwargs ) self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - error=Errors.INTERNAL_OTHER_JOB, + milestone=milestone, error=Errors.INTERNAL_OTHER_JOB, **bc_kwargs ) return { "notified": False, @@ -175,12 +185,17 @@ def _attempt_retry( *args, **kwargs, ): + bc_kwargs = { + "commit_sha": commit.commitid, + "repo_id": commit.repoid, + "task_name": self.name, + "parent_task_id": self.request.parent_id, + } try: self._call_upload_breadcrumb_task( - commit_sha=commit.commitid, - repo_id=commit.repoid, milestone=Milestones.NOTIFICATIONS_SENT, error=Errors.INTERNAL_RETRYING, + **bc_kwargs, ) self.retry(max_retries=max_retries, countdown=countdown) except MaxRetriesExceededError: @@ -196,10 +211,9 @@ def _attempt_retry( ) self.log_checkpoint(UploadFlow.NOTIF_TOO_MANY_RETRIES) self._call_upload_breadcrumb_task( - commit_sha=commit.commitid, - repo_id=commit.repoid, milestone=Milestones.NOTIFICATIONS_SENT, error=Errors.INTERNAL_OUT_OF_RETRIES, + **bc_kwargs, ) return { "notified": False, @@ -218,6 +232,10 @@ def run_impl_within_lock( **kwargs, ): milestone = Milestones.NOTIFICATIONS_SENT + bc_kwargs = { + "task_name": self.name, + "parent_task_id": self.request.parent_id, + } log.info("Starting notifications", extra={"commit": commitid, "repoid": repoid}) commits_query = db_session.query(Commit).filter( @@ -237,6 +255,7 @@ def run_impl_within_lock( repo_id=commit.repoid, milestone=Milestones.NOTIFICATIONS_SENT, error=Errors.SKIPPED_NOTIFICATIONS, + **bc_kwargs, ) return { "notify_attempted": False, @@ -266,6 +285,7 @@ def run_impl_within_lock( repo_id=repoid, milestone=milestone, error=Errors.REPO_MISSING_VALID_BOT, + **bc_kwargs, ) return {"notified": False, "notifications": None, "reason": "no_valid_bot"} except NoConfiguredAppsAvailable as exp: @@ -290,6 +310,7 @@ def run_impl_within_lock( repo_id=repoid, milestone=milestone, error=Errors.INTERNAL_APP_RATE_LIMITED, + **bc_kwargs, ) return self._attempt_retry( max_retries=10, @@ -339,6 +360,7 @@ def run_impl_within_lock( repo_id=commit.repoid, milestone=milestone, error=Errors.GIT_CLIENT_ERROR, + **bc_kwargs, ) return { "notified": False, @@ -356,6 +378,7 @@ def run_impl_within_lock( repo_id=commit.repoid, milestone=milestone, error=Errors.GIT_CLIENT_ERROR, + **bc_kwargs, ) return { "notified": False, @@ -439,6 +462,7 @@ def run_impl_within_lock( repo_id=commit.repoid, milestone=milestone, error=Errors.SKIPPED_NOTIFICATIONS, + **bc_kwargs, ) return { "notified": False, @@ -459,6 +483,7 @@ def run_impl_within_lock( repo_id=commit.repoid, milestone=milestone, error=Errors.REPORT_NOT_FOUND, + **bc_kwargs, ) return { "notified": False, @@ -504,6 +529,7 @@ def run_impl_within_lock( commit_sha=commit.commitid, repo_id=commit.repoid, milestone=milestone, + **bc_kwargs, ) log.info( "Notifications done", @@ -528,6 +554,7 @@ def run_impl_within_lock( repo_id=commit.repoid, milestone=milestone, error=Errors.SKIPPED_NOTIFICATIONS, + **bc_kwargs, ) return {"notified": False, "notifications": None} @@ -742,6 +769,8 @@ def submit_third_party_notifications( milestone=Milestones.NOTIFICATIONS_SENT, error=Errors.UNKNOWN, error_text=repr(e), + task_name=self.name, + parent_task_id=self.request.parent_id, ) raise diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 41e54cdc98..d56eaddffd 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -281,6 +281,14 @@ def run_impl( upload_ids = [upload["upload_id"] for upload in processing_results] + bc_kwargs = { + "commit_sha": commitid, + "repo_id": repoid, + "upload_ids": upload_ids, + "task_name": self.name, + "parent_task_id": self.request.parent_id, + } + # Idempotency check: Skip if all uploads are already processed # This prevents wasted work if multiple finishers are triggered (e.g., from # visibility timeout re-queuing) or if finisher is manually retried @@ -341,12 +349,7 @@ def run_impl( ) UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) - self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - upload_ids=upload_ids, - ) + self._call_upload_breadcrumb_task(milestone=milestone, **bc_kwargs) return log.info("run_impl: Handling finisher lock") @@ -366,11 +369,7 @@ def run_impl( except SoftTimeLimitExceeded: log.warning("run_impl: soft time limit exceeded") self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - upload_ids=upload_ids, - error=Errors.TASK_TIMED_OUT, + milestone=milestone, error=Errors.TASK_TIMED_OUT, **bc_kwargs ) return { "error": "Soft time limit exceeded", @@ -385,12 +384,10 @@ def run_impl( extra={"upload_ids": upload_ids}, ) self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, milestone=milestone, - upload_ids=upload_ids, error=Errors.UNKNOWN, error_text=repr(e), + **bc_kwargs, ) return { "error": str(e), @@ -412,6 +409,14 @@ def _process_reports_with_lock( repoid = commit.repoid commitid = commit.commitid + bc_kwargs = { + "commit_sha": commitid, + "repo_id": repoid, + "upload_ids": upload_ids, + "task_name": self.name, + "parent_task_id": self.request.parent_id, + } + log.info("run_impl: Loaded commit diff") lock_manager = LockManager( @@ -421,12 +426,20 @@ def _process_reports_with_lock( blocking_timeout=None, ) + self._call_upload_breadcrumb_task( + milestone=Milestones.LOCK_ACQUIRING, **bc_kwargs + ) + try: with lock_manager.locked( LockType.UPLOAD_PROCESSING, max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, retry_num=self.attempts, ): + self._call_upload_breadcrumb_task( + milestone=Milestones.LOCK_ACQUIRED, **bc_kwargs + ) + db_session.refresh(commit) report_service = ReportService(commit_yaml) @@ -458,13 +471,13 @@ def _process_reports_with_lock( log.info("run_impl: Finished upload_finisher task") + self._call_upload_breadcrumb_task( + milestone=Milestones.LOCK_RELEASED, **bc_kwargs + ) + except LockRetry as retry: self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - upload_ids=upload_ids, - error=Errors.INTERNAL_LOCK_ERROR, + error=Errors.INTERNAL_LOCK_ERROR, **bc_kwargs ) if retry.max_retries_exceeded or self._has_exceeded_max_attempts( UPLOAD_PROCESSOR_MAX_RETRIES @@ -481,19 +494,11 @@ def _process_reports_with_lock( }, ) self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - upload_ids=upload_ids, - error=Errors.INTERNAL_OUT_OF_RETRIES, + error=Errors.INTERNAL_OUT_OF_RETRIES, **bc_kwargs ) return self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - upload_ids=upload_ids, - error=Errors.INTERNAL_RETRYING, + error=Errors.INTERNAL_RETRYING, **bc_kwargs ) self.retry( max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, countdown=retry.countdown @@ -513,6 +518,14 @@ def _handle_finisher_lock( commitid = commit.commitid repository = commit.repository + bc_kwargs = { + "commit_sha": commitid, + "repo_id": repoid, + "upload_ids": upload_ids, + "task_name": self.name, + "parent_task_id": self.request.parent_id, + } + lock_manager = LockManager( repoid=repoid, commitid=commitid, @@ -520,12 +533,20 @@ def _handle_finisher_lock( blocking_timeout=None, ) + self._call_upload_breadcrumb_task( + milestone=Milestones.LOCK_ACQUIRING, **bc_kwargs + ) + try: with lock_manager.locked( LockType.UPLOAD_FINISHER, max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, retry_num=self.attempts, ): + self._call_upload_breadcrumb_task( + milestone=Milestones.LOCK_ACQUIRED, **bc_kwargs + ) + result = self.finish_reports_processing( db_session, commit, commit_yaml, processing_results ) @@ -569,22 +590,18 @@ def _handle_finisher_lock( log.info("handle_finisher_lock: Invalidating caches") self.invalidate_caches(lock_manager.redis_connection, commit) - self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - upload_ids=upload_ids, - ) + self._call_upload_breadcrumb_task(milestone=milestone, **bc_kwargs) log.info("handle_finisher_lock: Finished upload_finisher task") - return result + + self._call_upload_breadcrumb_task( + milestone=Milestones.LOCK_RELEASED, **bc_kwargs + ) + + return result except LockRetry as retry: self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - upload_ids=upload_ids, - error=Errors.INTERNAL_LOCK_ERROR, + error=Errors.INTERNAL_LOCK_ERROR, **bc_kwargs ) UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) if retry.max_retries_exceeded or self._has_exceeded_max_attempts( @@ -602,19 +619,11 @@ def _handle_finisher_lock( }, ) self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - upload_ids=upload_ids, - error=Errors.INTERNAL_OUT_OF_RETRIES, + error=Errors.INTERNAL_OUT_OF_RETRIES, **bc_kwargs ) return self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=milestone, - upload_ids=upload_ids, - error=Errors.INTERNAL_RETRYING, + error=Errors.INTERNAL_RETRYING, **bc_kwargs ) self.retry( max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, countdown=retry.countdown @@ -660,6 +669,8 @@ def finish_reports_processing( upload_ids=[ upload["upload_id"] for upload in processing_results ], + task_name=self.name, + parent_task_id=self.request.parent_id, ) log.info( "Scheduling notify task", diff --git a/apps/worker/tasks/upload_processor.py b/apps/worker/tasks/upload_processor.py index 54ff5fc104..dccfb9e34f 100644 --- a/apps/worker/tasks/upload_processor.py +++ b/apps/worker/tasks/upload_processor.py @@ -63,11 +63,16 @@ def run_impl( extra={"arguments": arguments, "commit_yaml": commit_yaml}, ) + bc_kwargs = { + "commit_sha": commitid, + "repo_id": repoid, + "upload_ids": [arguments["upload_id"]], + "task_name": self.name, + "parent_task_id": self.request.parent_id, + } + self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=Milestones.PROCESSING_UPLOAD, - upload_ids=[arguments["upload_id"]], + milestone=Milestones.PROCESSING_UPLOAD, **bc_kwargs ) def on_processing_error(error: ProcessingError): @@ -90,12 +95,10 @@ def on_processing_error(error: ProcessingError): ub_error = Errors.UNKNOWN self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, milestone=Milestones.PROCESSING_UPLOAD, - upload_ids=[arguments["upload_id"]], error=ub_error, error_text=error.error_text, + **bc_kwargs, ) # Retry with exponential backoff if error is retryable and under max retries @@ -111,11 +114,7 @@ def on_processing_error(error: ProcessingError): }, ) self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=Milestones.PROCESSING_UPLOAD, - upload_ids=[arguments["upload_id"]], - error=Errors.INTERNAL_RETRYING, + error=Errors.INTERNAL_RETRYING, **bc_kwargs ) self.retry(max_retries=error.max_retries, countdown=countdown) elif error.is_retryable and self.request.retries >= error.max_retries: @@ -142,23 +141,12 @@ def on_processing_error(error: ProcessingError): UserYaml(commit_yaml), arguments, ) - except SoftTimeLimitExceeded as e: - self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=Milestones.PROCESSING_UPLOAD, - upload_ids=[arguments["upload_id"]], - error=Errors.TASK_TIMED_OUT, - ) + except SoftTimeLimitExceeded: + self._call_upload_breadcrumb_task(error=Errors.TASK_TIMED_OUT, **bc_kwargs) raise except Exception as e: self._call_upload_breadcrumb_task( - commit_sha=commitid, - repo_id=repoid, - milestone=Milestones.PROCESSING_UPLOAD, - upload_ids=[arguments["upload_id"]], - error=Errors.UNKNOWN, - error_text=repr(e), + error=Errors.UNKNOWN, error_text=repr(e), **bc_kwargs ) raise