diff --git a/Containerfile b/Containerfile index b5f65cd..8dcbf7b 100644 --- a/Containerfile +++ b/Containerfile @@ -1,6 +1,6 @@ FROM fedora:latest -RUN dnf install -y python3-ogr python3-copr python3-pip && dnf clean all +RUN dnf install -y python3-ogr python3-copr python3-koji python3-pip fedpkg krb5-workstation && dnf clean all RUN pip3 install --upgrade sentry-sdk && pip3 check diff --git a/pyproject.toml b/pyproject.toml index 7254cfb..3c840a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ classifiers = [ dependencies = [ "click", "copr", + "koji", "ogr", "sentry-sdk", ] diff --git a/src/validation/cli/__init__.py b/src/validation/cli/__init__.py index 0200b95..95349f4 100644 --- a/src/validation/cli/__init__.py +++ b/src/validation/cli/__init__.py @@ -10,23 +10,25 @@ from validation.tests.github import GithubTests from validation.tests.gitlab import GitlabTests +from validation.tests.pagure import PagureTests -logging.basicConfig(level=logging.INFO) +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) @click.group(context_settings={"help_option_names": ["-h", "--help"]}, invoke_without_command=True) @click.version_option(prog_name="validation") def validation(): - loop = asyncio.get_event_loop() - tasks = set() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + tasks = [] # GitHub if getenv("GITHUB_TOKEN"): logging.info("Running validation for GitHub.") - task = loop.create_task(GithubTests().run()) - - tasks.add(task) - task.add_done_callback(tasks.discard) + tasks.append(GithubTests().run()) else: logging.info("GITHUB_TOKEN not set, skipping the validation for GitHub.") @@ -51,7 +53,7 @@ def validation(): continue logging.info("Running validation for GitLab instance: %s", instance_url) - task = loop.create_task( + tasks.append( GitlabTests( instance_url=instance_url, namespace=namespace, @@ -59,7 +61,44 @@ def validation(): ).run(), ) - tasks.add(task) - task.add_done_callback(tasks.discard) + # Pagure + pagure_instances = [ + ("https://src.fedoraproject.org/", "rpms", "PAGURE_TOKEN"), + ] + for instance_url, namespace, token in pagure_instances: + if not getenv(token): + logging.info( + "%s not set, skipping the validation for Pagure instance: %s", + token, + instance_url, + ) + continue + + logging.info("Running validation for Pagure instance: %s", instance_url) + tasks.append( + PagureTests( + instance_url=instance_url, + namespace=namespace, + token_name=token, + ).run(), + ) + + if not tasks: + logging.error("No tokens configured, no validation tests to run") + raise SystemExit(1) + + logging.info("Running %d validation test suite(s)", len(tasks)) + try: + results = loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) + logging.info("All validation tests completed") - loop.run_forever() + # Check if any test suite failed + failed_count = sum(1 for result in results if isinstance(result, Exception)) + if failed_count: + logging.error("%d test suite(s) failed", failed_count) + raise SystemExit(1) + except KeyboardInterrupt: + logging.info("Validation interrupted by user") + raise SystemExit(130) from None + finally: + loop.close() diff --git a/src/validation/helpers.py b/src/validation/helpers.py index a5c61db..4b96e09 100644 --- a/src/validation/helpers.py +++ b/src/validation/helpers.py @@ -2,18 +2,35 @@ # # SPDX-License-Identifier: MIT +import asyncio import logging +import re +import subprocess from functools import lru_cache from os import getenv +import koji as koji_module from copr.v3 import Client +class KerberosError(Exception): + """Exception raised for Kerberos-related errors.""" + + @lru_cache def copr(): return Client({"copr_url": "https://copr.fedorainfracloud.org"}) +@lru_cache +def koji(): + """ + Create and return a Koji session for querying Fedora Koji builds. + """ + koji_url = getenv("KOJI_URL", "https://koji.fedoraproject.org/kojihub") + return koji_module.ClientSession(koji_url) + + @lru_cache def sentry_sdk(): if sentry_secret := getenv("SENTRY_SECRET"): @@ -32,3 +49,121 @@ def log_failure(message: str): return logging.warning(message) + + +async def extract_principal_from_keytab(keytab_file: str) -> str: + """ + Extract principal from the specified keytab file. + Assumes there is a single principal in the keytab. + + Args: + keytab_file: Path to a keytab file. + + Returns: + Extracted principal name. + """ + proc = await asyncio.create_subprocess_exec( + "klist", + "-k", + "-K", + "-e", + keytab_file, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + if proc.returncode: + logging.error("klist command failed: %s", stderr.decode()) + msg = "klist command failed" + raise KerberosError(msg) + + # Parse klist output to extract principal + # Format: " 2 principal@REALM (aes256-cts-hmac-sha1-96) (0x...)" + key_pattern = re.compile(r"^\s*(\d+)\s+(\S+)\s+\((\S+)\)\s+\((\S+)\)$") + for line in stdout.decode().splitlines(): + if match := key_pattern.match(line): + # Return the principal associated with the first key + return match.group(2) + + msg = "No valid key found in the keytab file" + raise KerberosError(msg) + + +async def init_kerberos_ticket(keytab_file: str) -> str: + """ + Initialize Kerberos ticket from keytab file. + + Args: + keytab_file: Path to keytab file + + Returns: + Principal name for which ticket was initialized + """ + # Extract principal from keytab + principal = await extract_principal_from_keytab(keytab_file) + logging.debug("Extracted principal from keytab: %s", principal) + + # Check if ticket already exists + proc = await asyncio.create_subprocess_exec( + "klist", + "-l", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + + if proc.returncode == 0: + # Parse existing principals + principals = [ + parts[0] + for line in stdout.decode().splitlines() + if "Expired" not in line + for parts in (line.split(),) + if len(parts) >= 1 and "@" in parts[0] + ] + + if principal in principals: + logging.info("Using existing Kerberos ticket for %s", principal) + return principal + + # Initialize new ticket + logging.info("Initializing Kerberos ticket for %s", principal) + proc = await asyncio.create_subprocess_exec( + "kinit", + "-k", + "-t", + keytab_file, + principal, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + + if proc.returncode: + logging.error("kinit failed: %s", stderr.decode()) + msg = "kinit command failed" + raise KerberosError(msg) + + logging.info("Kerberos ticket initialized for %s", principal) + return principal + + +async def destroy_kerberos_ticket(principal: str): + """ + Destroy Kerberos ticket for the specified principal. + + Args: + principal: Principal name whose ticket should be destroyed + """ + logging.info("Destroying Kerberos ticket for %s", principal) + proc = await asyncio.create_subprocess_exec( + "kdestroy", + "-p", + principal, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + await proc.communicate() + + if proc.returncode: + logging.warning("Failed to destroy Kerberos ticket for %s", principal) diff --git a/src/validation/testcase/base.py b/src/validation/testcase/base.py index 072bb5a..187ebea 100644 --- a/src/validation/testcase/base.py +++ b/src/validation/testcase/base.py @@ -12,6 +12,7 @@ from github.GitRef import GitRef from gitlab.v4.objects import ProjectBranch from ogr.abstract import CommitFlag, GitProject, PullRequest +from ogr.exceptions import GithubAPIException from ogr.services.github.check_run import GithubCheckRun from validation.deployment import PRODUCTION_INFO, DeploymentInfo @@ -19,11 +20,25 @@ from validation.utils.trigger import Trigger +class TestFailureError(Exception): + """Exception raised when a test case fails with a specific failure message.""" + + class Testcase(ABC): - CHECK_TIME_FOR_REACTION = 5 - CHECK_TIME_FOR_SUBMIT_BUILDS = 45 - CHECK_TIME_FOR_BUILD = 20 - CHECK_TIME_FOR_WATCH_STATUSES = 30 + CHECK_TIME_FOR_STATUSES_TO_APPEAR = ( + 3 # minutes - time to wait for statuses to appear after trigger + ) + CHECK_TIME_FOR_REACTION = 3 # minutes - time to wait for commit statuses to be set to pending + CHECK_TIME_FOR_SUBMIT_BUILDS = 5 # minutes - time to wait for build to be submitted in Copr + CHECK_TIME_FOR_BUILD = 60 # minutes - time to wait for build to complete + CHECK_TIME_FOR_WATCH_STATUSES = 60 # minutes - time to watch for commit statuses + POLLING_INTERVAL = 2 # minutes - interval between status/build checks + # Initial wait times after triggering build, before first API check (API caching delays) + WAIT_AFTER_OPENED_PR = 2 # minutes - wait for API to reflect statuses after opening new PR + WAIT_AFTER_COMMENT_PUSH = 1 # minutes - wait after comment/push trigger + PACKIT_YAML_PATH = ".packit.yaml" + MAX_COMMENTS_TO_CHECK = 5 # Limit comment fetching to avoid excessive API calls + HTTP_FORBIDDEN = 403 # HTTP status code for forbidden/access denied def __init__( self, @@ -32,6 +47,7 @@ def __init__( trigger: Trigger = Trigger.pr_opened, deployment: DeploymentInfo | None = None, comment: str | None = None, + existing_prs: list | None = None, ): self.project = project self.pr = pr @@ -42,30 +58,107 @@ def __init__( self._copr_project_name = None self.deployment = deployment or PRODUCTION_INFO self.comment = comment - self.loop = asyncio.get_event_loop() self._build = None - self._statuses: list[GithubCheckRun] | list[CommitFlag] = [] + self._statuses: list[GithubCheckRun | CommitFlag] = [] + self._build_triggered_at: datetime | None = None + self._existing_prs = existing_prs # Cache to avoid re-fetching in create_pr() + + @property + def copr_project_name(self): + """ + Get the name of Copr project from id of the PR. + :return: + """ + if self.pr and not self._copr_project_name: + self._copr_project_name = self.construct_copr_project_name() + return self._copr_project_name + + def _cleanup(self): + """ + Hook for subclasses to perform cleanup after test completion. + Called in finally block to ensure cleanup happens even if test fails. + """ - async def run_test(self): + @staticmethod + def _ensure_aware_datetime(dt: datetime) -> datetime: + """ + Convert a naive datetime to UTC-aware datetime. + If already timezone-aware, return as-is. + + Args: + dt: A datetime object (naive or aware) + + Returns: + Timezone-aware datetime (assumes UTC for naive datetimes) + """ + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt + + def _check_for_error_comment(self) -> str | None: + """ + Check for new error comments from packit-service. + Uses generator to avoid fetching all comment pages. + + Returns: + The comment body if a new error comment is found, None otherwise + """ + if not self.pr: + return None + + # Get comments in reverse order (newest first) + # Generator is lazy - stops fetching pages once we find what we need + for i, comment in enumerate(self.pr.get_comments(reverse=True)): + if comment.author == self.account_name: + # Found a comment from packit-service + return comment.body + # Only check first MAX_COMMENTS_TO_CHECK comments to avoid excessive API calls + if i >= self.MAX_COMMENTS_TO_CHECK - 1: + break + + return None + + async def run_test(self) -> bool: """ Run all checks, if there is any failure message, send it to Sentry and in case of opening PR close it. + + Returns: + bool: True if test passed, False if test failed """ + pr_id = f"PR#{self.pr.id}" if self.pr else "new PR" + logging.info("Starting test for %s (%s trigger)", pr_id, self.trigger.value) + test_passed = False try: await self.run_checks() if self.failure_msg: message = f"{self.pr.title} ({self.pr.url}) failed: {self.failure_msg}" - + logging.error("Test failed: %s", message) log_failure(message) + # Raise exception with failure message for better error reporting + raise TestFailureError(self.failure_msg.strip()) + logging.info("Test passed for %s", pr_id) + test_passed = True if self.trigger == Trigger.pr_opened: + logging.debug("Closing PR and deleting branch for %s", pr_id) self.pr.close() - self.pr_branch_ref.delete() + if self.pr_branch_ref: + self.pr_branch_ref.delete() + except TestFailureError: + # Re-raise TestFailureError to preserve the failure message + raise except Exception as e: - msg = f"Validation test {self.pr.title} ({self.pr.url}) failed: {e}" + pr_info = f"{self.pr.title} ({self.pr.url})" if self.pr else "new PR" + msg = f"Validation test {pr_info} failed: {e}" logging.error(msg) tb = traceback.format_exc() logging.error(tb) + test_passed = False + finally: + self._cleanup() + + return test_passed def trigger_build(self): """ @@ -76,8 +169,29 @@ def trigger_build(self): self.pr if self.pr else "new PR", ) if self.trigger == Trigger.comment: + if not self.pr: + msg = "Cannot post comment: PR is not set" + raise ValueError(msg) + comment = self.comment or self.deployment.pr_comment - self.pr.comment(comment) + try: + self.pr.comment(comment) + except GithubAPIException as e: + if e.response_code == self.HTTP_FORBIDDEN: + error_msg = ( + f"Failed to post comment to PR {self.pr.url} (HTTP 403 Forbidden).\n" + "This typically means:\n" + " 1. The PR has reached GitHub's 2,500 comment limit and " + "commenting is disabled, OR\n" + " 2. The PR has been closed/locked by GitHub.\n" + f"Please check the PR at {self.pr.url} and verify its status.\n" + "If the PR has hit the comment limit, close it and create a " + "fresh test PR.\n" + ) + logging.error(error_msg) + raise RuntimeError(error_msg) from e + # Re-raise if it's a different error + raise elif self.trigger == Trigger.push: self.push_to_pr() else: @@ -98,16 +212,29 @@ def create_pr(self): """ source_branch = f"test/{self.deployment.name}/opened_pr" pr_title = f"Basic test case ({self.deployment.name}): opened PR trigger" + logging.info("Creating new PR: %s from branch %s", pr_title, source_branch) self.delete_previous_branch(source_branch) + # Delete the PR from the previous test run if it exists. - existing_pr = [pr for pr in self.project.get_pr_list() if pr.title == pr_title] + # Use cached PR list from constructor to avoid re-fetching + if self._existing_prs is None: + logging.debug("Fetching PR list to check for existing PR...") + existing_prs = list(self.project.get_pr_list()) + else: + logging.debug("Using cached PR list to check for existing PR") + existing_prs = self._existing_prs + + existing_pr = [pr for pr in existing_prs if pr.title == pr_title] if len(existing_pr) == 1: + logging.debug("Closing existing PR: %s", existing_pr[0].url) existing_pr[0].close() + logging.debug("Creating file in new branch: %s", source_branch) self.create_file_in_new_branch(source_branch) if self.deployment.opened_pr_trigger__packit_yaml_fix: self.fix_packit_yaml(source_branch) + logging.debug("Creating PR...") self.pr = self.project.create_pr( title=pr_title, body="This test case is triggered automatically by our validation script.", @@ -115,17 +242,48 @@ def create_pr(self): source_branch=source_branch, ) self.head_commit = self.pr.head_commit + logging.info("PR created: %s", self.pr.url) async def run_checks(self): """ Run all checks of the test case. """ - await self.check_build_submitted() + # Check if this is a "skip build" test - if so, skip Copr checks + skip_copr_checks = False + if self.pr and "skip" in self.pr.title.lower() and "build" in self.pr.title.lower(): + skip_copr_checks = True + logging.info("Detected 'skip build' test: %s - skipping Copr checks", self.pr.title) - if not self._build: - return + if skip_copr_checks: + # For skip_build tests, trigger the build but don't wait for Copr submission/completion + self._build_triggered_at = datetime.now(tz=timezone.utc) + self.trigger_build() + + # Wait for API to reflect newly created statuses (caching/eventual consistency) + if self.trigger == Trigger.pr_opened: + logging.debug( + "Waiting %d minute(s) for API to reflect new statuses...", + self.WAIT_AFTER_OPENED_PR, + ) + await asyncio.sleep(self.WAIT_AFTER_OPENED_PR * 60) + else: + logging.debug( + "Waiting %d minute(s) for webhook processing...", + self.WAIT_AFTER_COMMENT_PUSH, + ) + await asyncio.sleep(self.WAIT_AFTER_COMMENT_PUSH * 60) + + # Check that statuses are set (should show build was skipped) + await self.check_pending_check_runs() + else: + # Normal flow: check build submission and completion + await self.check_build_submitted() + + if not self._build: + return + + await self.check_build(self._build.id) - await self.check_build(self._build.id) await self.check_completed_statuses() self.check_comment() @@ -134,31 +292,48 @@ async def check_pending_check_runs(self): Check whether some check run is set to queued (they are updated in loop, so it is enough). """ - status_names = [self.get_status_name(status) for status in self.get_statuses()] + # Don't filter by recency here - just check that statuses exist + # Recency filtering happens later in watch_statuses() to exclude old completed statuses + all_statuses = self.get_statuses() + status_names = [self.get_status_name(status) for status in all_statuses] - watch_end = datetime.now(tz=timezone.utc) + timedelta(minutes=self.CHECK_TIME_FOR_REACTION) - failure_message = ( - "Github check runs were not set to queued in time " - f"({self.CHECK_TIME_FOR_REACTION} minutes).\n" + # Phase 1: Wait for statuses to appear + watch_end = datetime.now(tz=timezone.utc) + timedelta( + minutes=self.CHECK_TIME_FOR_STATUSES_TO_APPEAR, ) # when a new PR is opened while len(status_names) == 0: if datetime.now(tz=timezone.utc) > watch_end: - self.failure_msg += failure_message + self.failure_msg += ( + f"Commit statuses did not appear in time " + f"({self.CHECK_TIME_FOR_STATUSES_TO_APPEAR} minutes).\n" + ) return - status_names = [self.get_status_name(status) for status in self.get_statuses()] await asyncio.sleep(30) + all_statuses = self.get_statuses() + status_names = [self.get_status_name(status) for status in all_statuses] logging.info( "Watching pending statuses for commit %s", self.head_commit, ) + + # Small delay before entering polling loop to avoid rapid API calls + await asyncio.sleep(5) + + # Phase 2: Wait for statuses to be set to pending (reset timeout) + watch_end = datetime.now(tz=timezone.utc) + timedelta(minutes=self.CHECK_TIME_FOR_REACTION) + while True: if datetime.now(tz=timezone.utc) > watch_end: - self.failure_msg += failure_message + self.failure_msg += ( + f"Commit statuses were not set to pending in time " + f"({self.CHECK_TIME_FOR_REACTION} minutes).\n" + ) return + # Check all statuses with matching names (don't filter by recency yet) new_statuses = [ status for status in self.get_statuses() @@ -172,13 +347,16 @@ async def check_pending_check_runs(self): if not self.is_status_completed(status): return - await asyncio.sleep(60) + await asyncio.sleep(self.POLLING_INTERVAL * 60) async def check_build_submitted(self): """ Check whether the build was submitted in Copr in time. """ - if self.pr: + # Only check for existing builds if PR already exists + # For new PR test, there can't be any existing builds + old_build_len = 0 + if self.pr and self.trigger != Trigger.pr_opened: try: old_build_len = len( copr().build_proxy.get_list(self.deployment.copr_user, self.copr_project_name), @@ -186,14 +364,23 @@ async def check_build_submitted(self): except Exception: old_build_len = 0 - old_comment_len = len(self.pr.get_comments()) - else: - # the PR is not created yet - old_build_len = 0 - old_comment_len = 0 - + self._build_triggered_at = datetime.now(tz=timezone.utc) self.trigger_build() + # Wait for API to reflect newly created statuses (caching/eventual consistency) + if self.trigger == Trigger.pr_opened: + logging.debug( + "Waiting %d minute(s) for API to reflect new statuses...", + self.WAIT_AFTER_OPENED_PR, + ) + await asyncio.sleep(self.WAIT_AFTER_OPENED_PR * 60) + else: + logging.debug( + "Waiting %d minute(s) for webhook processing...", + self.WAIT_AFTER_COMMENT_PUSH, + ) + await asyncio.sleep(self.WAIT_AFTER_COMMENT_PUSH * 60) + watch_end = datetime.now(tz=timezone.utc) + timedelta( minutes=self.CHECK_TIME_FOR_SUBMIT_BUILDS, ) @@ -221,23 +408,20 @@ async def check_build_submitted(self): except Exception as e: # project does not exist yet msg = f"Copr project doesn't exist yet: {e}" - logging.warning(msg) + logging.debug(msg) + await asyncio.sleep(30) continue if len(new_builds) >= old_build_len + 1: self._build = new_builds[0] return - new_comments = self.pr.get_comments(reverse=True) - new_comments = new_comments[: (len(new_comments) - old_comment_len)] - - if len(new_comments) > 1: - comment = [ - comment.body for comment in new_comments if comment.author == self.account_name - ] - if len(comment) > 0: + # Check for new error comments from packit-service after build was triggered + if self.pr: + error_comment = self._check_for_error_comment() + if error_comment: self.failure_msg += ( - f"New github comment from p-s while submitting Copr build: {comment[0]}\n" + f"New comment from packit-service while submitting build: {error_comment}\n" ) await asyncio.sleep(120) @@ -262,7 +446,7 @@ async def check_build(self, build_id): build = copr().build_proxy.get(build_id) if build.state == state_reported: - await asyncio.sleep(60) + await asyncio.sleep(self.POLLING_INTERVAL * 60) continue state_reported = build.state @@ -280,38 +464,50 @@ async def check_build(self, build_id): ) return - await asyncio.sleep(60) + await asyncio.sleep(self.POLLING_INTERVAL * 60) def check_comment(self): """ - Check whether p-s has commented when the Copr build was not successful. + Check whether packit-service has commented when the Copr build was not successful. """ failure = "The build in Copr was not successful." in self.failure_msg - if failure: - packit_comments = [ - comment - for comment in self.pr.get_comments(reverse=True) - if comment.author == self.account_name - ] - if not packit_comments: + if failure and self.pr: + # Check recent comments (newest first) using generator + found_packit_comment = False + for i, comment in enumerate(self.pr.get_comments(reverse=True)): + if comment.author == self.account_name: + found_packit_comment = True + break + # Only check first MAX_COMMENTS_TO_CHECK comments + if i >= self.MAX_COMMENTS_TO_CHECK - 1: + break + + if not found_packit_comment: self.failure_msg += ( - "No comment from p-s about unsuccessful last copr build found.\n" + "No comment from packit-service about unsuccessful last Copr build found.\n" ) + def _get_packit_yaml_ref(self, branch: str) -> str: + """ + Get the git ref to read .packit.yaml from. + Can be overridden in subclasses (e.g., Pagure reads from default branch). + """ + return branch + def fix_packit_yaml(self, branch: str): """ Update .packit.yaml file in the branch according to the deployment needs """ - path = ".packit.yaml" - packit_yaml_content = self.project.get_file_content(path=path, ref=branch) + ref = self._get_packit_yaml_ref(branch) + packit_yaml_content = self.project.get_file_content(path=self.PACKIT_YAML_PATH, ref=ref) packit_yaml_content = packit_yaml_content.replace( self.deployment.opened_pr_trigger__packit_yaml_fix.from_str, self.deployment.opened_pr_trigger__packit_yaml_fix.to_str, ) self.update_file_and_commit( - path=path, + path=self.PACKIT_YAML_PATH, commit_msg=self.deployment.opened_pr_trigger__packit_yaml_fix.git_msg, content=packit_yaml_content, branch=branch, @@ -345,36 +541,56 @@ async def watch_statuses(self): ) while True: - self._statuses = self.get_statuses() + all_statuses = self.get_statuses() + # Filter to only recent statuses (created after build was triggered) + self._statuses = [status for status in all_statuses if self.is_status_recent(status)] + + # Log if we filtered out any old statuses + filtered_count = len(all_statuses) - len(self._statuses) + if filtered_count > 0: + logging.debug( + "Filtered out %d old status(es) from before build was triggered", + filtered_count, + ) - if all(self.is_status_completed(status) for status in self._statuses): + # Only consider checks complete if we have statuses AND they're all done + if self._statuses and all( + self.is_status_completed(status) for status in self._statuses + ): break if datetime.now(tz=timezone.utc) > watch_end: - self.failure_msg += ( - "These check runs were not completed in " - f"{self.CHECK_TIME_FOR_WATCH_STATUSES} minutes" - " after Copr build had been built:\n" - ) - for status in self._statuses: - if not self.is_status_completed(status): - self.failure_msg += f"{self.get_status_name(status)}\n" + if not self._statuses: + self.failure_msg += ( + "No commit statuses found after " + f"{self.CHECK_TIME_FOR_WATCH_STATUSES} minutes. " + "packit-service may not have responded to the PR.\n" + ) + else: + self.failure_msg += ( + "These commit statuses were not completed in " + f"{self.CHECK_TIME_FOR_WATCH_STATUSES} minutes" + " after the build was submitted:\n" + ) + for status in self._statuses: + if not self.is_status_completed(status): + self.failure_msg += f"{self.get_status_name(status)}\n" return - await asyncio.sleep(60) + await asyncio.sleep(self.POLLING_INTERVAL * 60) @property @abstractmethod - def account_name(self): + def account_name(self) -> str: """ - Name of the (bot) account in GitHub/GitLab. + Get the name of the (bot) account in GitHub/GitLab/Pagure. """ - @property @abstractmethod - def copr_project_name(self): + def construct_copr_project_name(self) -> str: """ - Name of Copr project from id of the PR. + Construct the Copr project name from the PR. + Used by GitHub/GitLab. Pagure overrides to raise NotImplementedError. """ @abstractmethod @@ -395,6 +611,13 @@ def is_status_successful(self, status: Union[GithubCheckRun, CommitFlag]) -> boo Check whether the status is in successful state. """ + @abstractmethod + def is_status_recent(self, status: Union[GithubCheckRun, CommitFlag]) -> bool: + """ + Check whether the status was created after the build was triggered. + This filters out old statuses from previous test runs. + """ + @abstractmethod def delete_previous_branch(self, ref: str): """ diff --git a/src/validation/testcase/github.py b/src/validation/testcase/github.py index e6f7eb6..172199e 100644 --- a/src/validation/testcase/github.py +++ b/src/validation/testcase/github.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: MIT -from functools import cached_property +from datetime import timedelta from github import InputGitAuthor from github.Commit import Commit @@ -24,8 +24,7 @@ class GithubTestcase(Testcase): def account_name(self): return self.deployment.github_bot_name - @cached_property - def copr_project_name(self) -> str: + def construct_copr_project_name(self) -> str: return f"packit-hello-world-{self.pr.id}" def get_status_name(self, status: GithubCheckRun) -> str: @@ -59,6 +58,23 @@ def is_status_successful(self, status: GithubCheckRun) -> bool: def is_status_completed(self, status: GithubCheckRun) -> bool: return status.status == GithubCheckRunStatus.completed + def is_status_recent(self, status: GithubCheckRun) -> bool: + """ + Check if the status was created after the build was triggered. + Uses started_at timestamp with a 1-minute buffer for clock skew. + """ + if not self._build_triggered_at: + return True # No trigger time set, accept all statuses + if not status.started_at: + return True # No timestamp on status, accept it + + # Convert naive datetime to UTC-aware if needed + status_time = self._ensure_aware_datetime(status.started_at) + + # Allow 1 minute buffer for clock skew + buffer_time = self._build_triggered_at - timedelta(minutes=1) + return status_time >= buffer_time + def delete_previous_branch(self, branch: str): existing_branch = self.project.github_repo.get_git_matching_refs(f"heads/{branch}") if existing_branch.totalCount: diff --git a/src/validation/testcase/gitlab.py b/src/validation/testcase/gitlab.py index 9f8b9e2..e13080d 100644 --- a/src/validation/testcase/gitlab.py +++ b/src/validation/testcase/gitlab.py @@ -2,7 +2,8 @@ # # SPDX-License-Identifier: MIT -from functools import cached_property +import logging +from datetime import timedelta from gitlab import GitlabGetError from ogr.abstract import CommitFlag, CommitStatus @@ -18,8 +19,7 @@ class GitlabTestcase(Testcase): def account_name(self): return self.deployment.gitlab_account_name - @cached_property - def copr_project_name(self) -> str: + def construct_copr_project_name(self) -> str: return f"{self.project.service.hostname}-{self.project.namespace}-hello-world-{self.pr.id}" def get_status_name(self, status: CommitFlag) -> str: @@ -41,12 +41,50 @@ def create_file_in_new_branch(self, branch: str): }, ) + def _check_status_author(self, status: CommitFlag) -> bool: + """ + Check if status author matches the account name. + Returns True if match, False otherwise (including on errors). + """ + try: + if not status._raw_commit_flag or not status._raw_commit_flag.author: + return False + author_username = status._raw_commit_flag.author["username"] + logging.debug( + "Status '%s' by '%s' - Match: %s", + status.context, + author_username, + author_username == self.account_name, + ) + return author_username == self.account_name + except (KeyError, AttributeError, TypeError) as e: + logging.warning( + "Failed to get author for status %s: %s - Raw: %s", + status.context, + e, + status._raw_commit_flag, + ) + return False + def get_statuses(self) -> list[CommitFlag]: - return [ - status - for status in self.project.get_commit_statuses(commit=self.head_commit) - if status._raw_commit_flag.author["username"] == self.account_name - ] + all_statuses = list(self.project.get_commit_statuses(commit=self.head_commit)) + + logging.debug( + "Fetching statuses for commit %s, looking for author: %s", + self.head_commit, + self.account_name, + ) + + filtered_statuses = [status for status in all_statuses if self._check_status_author(status)] + + logging.debug( + "Found %d/%d statuses from %s", + len(filtered_statuses), + len(all_statuses), + self.account_name, + ) + + return filtered_statuses def is_status_successful(self, status: CommitFlag) -> bool: return status.state == CommitStatus.success @@ -57,6 +95,23 @@ def is_status_completed(self, status: CommitFlag) -> bool: CommitStatus.pending, ] + def is_status_recent(self, status: CommitFlag) -> bool: + """ + Check if the status was created after the build was triggered. + Uses created timestamp with a 1-minute buffer for clock skew. + """ + if not self._build_triggered_at: + return True # No trigger time set, accept all statuses + if not status.created: + return True # No timestamp on status, accept it + + # Convert naive datetime to UTC-aware if needed + status_time = self._ensure_aware_datetime(status.created) + + # Allow 1 minute buffer for clock skew + buffer_time = self._build_triggered_at - timedelta(minutes=1) + return status_time >= buffer_time + def delete_previous_branch(self, branch: str): try: existing_branch = self.project.gitlab_repo.branches.get(branch) @@ -71,6 +126,11 @@ def update_file_and_commit(self, path: str, commit_msg: str, content: str, branc file.save(branch=branch, commit_message=commit_msg) def create_empty_commit(self, branch: str, commit_msg: str) -> str: - data = {"branch": branch, "commit_message": commit_msg, "actions": []} + data = { + "branch": branch, + "commit_message": commit_msg, + "actions": [], + "allow_empty": True, + } commit = self.project.gitlab_repo.commits.create(data) return commit.id diff --git a/src/validation/testcase/pagure.py b/src/validation/testcase/pagure.py new file mode 100644 index 0000000..c571c90 --- /dev/null +++ b/src/validation/testcase/pagure.py @@ -0,0 +1,715 @@ +# SPDX-FileCopyrightText: 2023-present Contributors to the Packit Project. +# +# SPDX-License-Identifier: MIT + +import asyncio +import configparser +import logging +import os +import shutil +import subprocess +import tempfile +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from ogr.abstract import CommitFlag, CommitStatus +from ogr.services.pagure import PagureProject + +from validation.helpers import koji +from validation.testcase.base import Testcase + +# Koji task states +KOJI_TASK_FREE = 0 +KOJI_TASK_OPEN = 1 +KOJI_TASK_COMPLETED = 2 +KOJI_TASK_CANCELED = 3 +KOJI_TASK_ASSIGNED = 4 +KOJI_TASK_FAILED = 5 + + +class KojiBuildWrapper: + """Wrapper to make Koji build dict compatible with Copr build object interface.""" + + def __init__(self, build_dict: dict): + self._build = build_dict + + @property + def id(self): + """Return the build ID from the Koji build dict.""" + return self._build.get("build_id") or self._build.get("id") + + +class PagureTestcase(Testcase): + """ + Testcase implementation for Pagure-based forges (src.fedoraproject.org, etc.). + + This testcase uses fedpkg for repository cloning and SSH keys for git operations. + The Pagure account name defaults to 'packit-ci-test-bot' but can be overridden. + + Environment variables: + PAGURE_ACCOUNT_NAME: Override the default account name (default: packit-ci-test-bot) + PAGURE_SSH_KEY: Path to SSH private key for git operations + PAGURE_TOKEN: API token for Pagure operations + PAGURE_KEYTAB: Path to Kerberos keytab file for authentication + + Example usage: + # Use default packit-ci-test-bot account + export PAGURE_TOKEN= + export PAGURE_SSH_KEY=/path/to/packit-ci-test-bot-key + export PAGURE_KEYTAB=/path/to/keytab + + # Override to use a different account (e.g., for staging tests) + export PAGURE_ACCOUNT_NAME=packit-stg + export PAGURE_TOKEN= + export PAGURE_SSH_KEY=/path/to/packit-stg-key + export PAGURE_KEYTAB=/path/to/keytab + """ + + project: PagureProject + + # Pagure is slower than GitHub/GitLab, increase timeout for build submission + CHECK_TIME_FOR_SUBMIT_BUILDS = 10 # minutes + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._temp_dir = None + self._fork = None + self._config_dir = None + + @property + def account_name(self): + # Allow overriding account name via environment variable for testing + # Default to packit-ci-test-bot + return os.getenv("PAGURE_ACCOUNT_NAME", "packit-ci-test-bot") + + def get_status_name(self, status: CommitFlag) -> str: + return status.context + + def construct_copr_project_name(self) -> str: + """ + Not applicable for Pagure - uses Koji builds instead of Copr. + This method is never called since check_build_submitted is overridden. + """ + msg = "Pagure uses Koji builds, not Copr builds" + raise NotImplementedError(msg) + + def _cleanup(self): + """Clean up temporary directories created for git operations and config.""" + if self._temp_dir and Path(self._temp_dir).exists(): + logging.debug("Cleaning up temporary directory: %s", self._temp_dir) + shutil.rmtree(self._temp_dir) + self._temp_dir = None + if self._config_dir and Path(self._config_dir).exists(): + logging.debug("Cleaning up config directory: %s", self._config_dir) + shutil.rmtree(self._config_dir) + self._config_dir = None + + def _setup_fedpkg_token(self): + """ + Write PAGURE_TOKEN to fedpkg config file in a temporary directory. + Returns the config file path to be used with --user-config option. + """ + token = os.getenv("PAGURE_TOKEN") + if not token: + logging.warning("PAGURE_TOKEN not set, fedpkg commands may fail") + return None + + # Create temporary config directory + self._config_dir = tempfile.mkdtemp(prefix="validation-fedpkg-config-") + config_file = Path(self._config_dir) / "fedpkg.conf" + + # Create config with token + config = configparser.ConfigParser() + config.add_section("fedpkg.distgit") + config.set("fedpkg.distgit", "token", token) + + # Write config + with open(config_file, "w") as f: + config.write(f) + + logging.debug("Wrote PAGURE_TOKEN to %s", config_file) + return str(config_file) + + def _get_authenticated_username(self): + """Get the authenticated user's username.""" + if not hasattr(self, "_username"): + self._username = self.project.service.user.get_username() + logging.debug("Authenticated user: %s", self._username) + return self._username + + def _ensure_fork_exists(self): + """Ensure a fork exists under the authenticated user.""" + user = self._get_authenticated_username() + + # Try to create the fork, ignore error if it already exists + try: + logging.info("Creating fork under user %s", user) + self.project.fork_create() + logging.info("Fork created successfully") + except Exception as e: + # Fork likely already exists + if "already exists" in str(e): + logging.debug("Fork already exists under user %s", user) + else: + # Some other error, re-raise + raise + + def _setup_git_repo(self): + """Clone the repository using fedpkg and set up fork.""" + if self._temp_dir is None: + # Set up fedpkg token from environment variable + config_file = self._setup_fedpkg_token() + + # Create parent temporary directory + parent_temp = tempfile.mkdtemp(prefix="validation-pagure-") + logging.debug("Created temporary directory: %s", parent_temp) + + # Check for SSH key to use for git operations + ssh_key_path = os.getenv("PAGURE_SSH_KEY") + if ssh_key_path and os.path.exists(ssh_key_path): + logging.debug("Will use SSH key: %s", ssh_key_path) + else: + logging.warning( + "PAGURE_SSH_KEY not set or file doesn't exist, SSH authentication may fail", + ) + + # Use fedpkg to clone + package_name = self.project.repo + logging.info("Cloning %s using fedpkg", package_name) + + # Build fedpkg command with config file if available + clone_cmd = ["fedpkg"] + if config_file: + clone_cmd.extend(["--user-config", config_file]) + clone_cmd.extend(["clone", "-a", package_name]) + + try: + subprocess.run( # noqa: S603 + clone_cmd, + cwd=parent_temp, + check=True, + capture_output=True, + text=True, + ) + self._temp_dir = str(Path(parent_temp) / package_name) + logging.debug("Repository cloned to: %s", self._temp_dir) + except subprocess.CalledProcessError as e: + logging.error("Failed to clone with fedpkg: %s\nstderr: %s", e, e.stderr) + shutil.rmtree(parent_temp) + raise + + # Ensure fork exists via API + self._ensure_fork_exists() + + # Use fedpkg to set up fork remote + logging.info("Setting up fork with fedpkg") + user = self.account_name + + # Build fedpkg fork command with config file if available + fork_cmd = ["fedpkg"] + if config_file: + fork_cmd.extend(["--user-config", config_file]) + fork_cmd.append("fork") + + try: + subprocess.run( # noqa: S603 + fork_cmd, + cwd=self._temp_dir, + check=True, + capture_output=True, + text=True, + ) + + # fedpkg fork creates HTTPS remote, but we need SSH for Kerberos + # Change the fork remote URL to SSH + hostname = self.project.service.hostname + if hostname == "src.fedoraproject.org": + git_hostname = "pkgs.fedoraproject.org" + else: + git_hostname = hostname + + ssh_url = f"ssh://{user}@{git_hostname}/forks/{user}/{self.project.namespace}/{self.project.repo}.git" + logging.debug("Changing fork remote to SSH: %s", ssh_url) + + subprocess.run( # noqa: S603 + ["git", "remote", "set-url", user, ssh_url], # noqa: S607 + cwd=self._temp_dir, + check=True, + capture_output=True, + ) + logging.debug("Fork remote configured with SSH URL") + + # Configure git to use SSH key if provided + ssh_key_path = os.getenv("PAGURE_SSH_KEY") + if ssh_key_path: + ssh_command = f"ssh -i {ssh_key_path} -o IdentitiesOnly=yes" + subprocess.run( # noqa: S603 + ["git", "config", "core.sshCommand", ssh_command], # noqa: S607 + cwd=self._temp_dir, + check=True, + capture_output=True, + ) + logging.debug("Configured git to use SSH key: %s", ssh_key_path) + + except subprocess.CalledProcessError as e: + logging.warning("Fork setup failed: %s\nstderr: %s", e, e.stderr) + + return self._temp_dir + + def create_file_in_new_branch(self, branch: str): + """Create a new branch and add a file using git operations.""" + repo_dir = self._setup_git_repo() + + # Create and checkout new branch + logging.info("Creating branch: %s", branch) + subprocess.run( # noqa: S603 + ["git", "checkout", "-b", branch], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + ) + + # Create a test file + test_file = Path(repo_dir) / "test.txt" + test_file.write_text("Testing the opened PR trigger.") + + # Add and commit the file + subprocess.run( # noqa: S603 + ["git", "add", "test.txt"], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + ) + subprocess.run( # noqa: S603 + ["git", "commit", "-m", "Opened PR trigger"], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + ) + logging.debug("Created commit with test file") + + # Push to fork (fedpkg fork creates remote named after Kerberos principal) + # Use deployment account name which should match the Kerberos principal + fork_remote = self.account_name + logging.info("Pushing branch %s to fork remote '%s'", branch, fork_remote) + try: + # Push to fork with upstream tracking (force push to overwrite existing test branch) + subprocess.run( # noqa: S603 + ["git", "push", "--force", "--set-upstream", fork_remote, branch], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + text=True, + ) + logging.info("Successfully pushed branch to fork") + except subprocess.CalledProcessError as e: + logging.error("git push failed: %s\nstderr: %s", e, e.stderr) + raise + + def create_pr(self): + """Override create_pr to implement Pagure-specific PR creation.""" + source_branch = f"test/{self.deployment.name}/opened_pr" + pr_title = f"Basic test case ({self.deployment.name}): opened PR trigger" + logging.info("Creating new PR: %s from branch %s", pr_title, source_branch) + self.delete_previous_branch(source_branch) + + # Delete the PR from the previous test run if it exists + existing_pr = [pr for pr in self.project.get_pr_list() if pr.title == pr_title] + if len(existing_pr) == 1: + logging.debug("Closing existing PR: %s", existing_pr[0].url) + existing_pr[0].close() + + logging.debug("Creating file in new branch: %s", source_branch) + self.create_file_in_new_branch(source_branch) + if self.deployment.opened_pr_trigger__packit_yaml_fix: + self.fix_packit_yaml(source_branch) + + logging.debug("Creating PR via Pagure API...") + # For Pagure, we need to create PR from the fork to the parent project + self._ensure_fork_exists() + + # Get fork username to specify where the branch is located + fork_username = self.account_name + + # Since OGR's Pagure create_pr is not implemented, call the API directly + # When creating PR from a fork, must specify repo_from parameters + pr_data = self.project._call_project_api( + "pull-request", + "new", + method="POST", + data={ + "title": pr_title, + "branch_to": self.project.default_branch, + "branch_from": source_branch, + "initial_comment": ( + "This test case is triggered automatically by our validation script." + ), + "repo_from": self.project.repo, + "repo_from_username": fork_username, + "repo_from_namespace": self.project.namespace, + }, + ) + + # Get the created PR + from ogr.services.pagure.pull_request import PagurePullRequest + + # PagurePullRequest expects the raw PR data, not just the ID + self.pr = PagurePullRequest(raw_pr=pr_data, project=self.project) + self.head_commit = self.pr.head_commit + logging.info("PR created: %s", self.pr.url) + + def get_statuses(self) -> list[CommitFlag]: + # Filter by the Packit service account that sets commit statuses + # This is the same as the Copr user (packit for prod, packit-stg for staging) + packit_user = self.deployment.copr_user + return [ + status + for status in self.project.get_commit_statuses(commit=self.head_commit) + if status._raw_commit_flag.get("user", {}).get("name") == packit_user + ] + + def is_status_successful(self, status: CommitFlag) -> bool: + return status.state == CommitStatus.success + + def is_status_completed(self, status: CommitFlag) -> bool: + return status.state not in [ + CommitStatus.running, + CommitStatus.pending, + ] + + def is_status_recent(self, status: CommitFlag) -> bool: + """ + Check if the status was created after the build was triggered. + Uses created timestamp with a 1-minute buffer for clock skew. + """ + if not self._build_triggered_at: + return True # No trigger time set, accept all statuses + if not status.created: + return True # No timestamp on status, accept it + + # Convert naive datetime to UTC-aware if needed + status_time = self._ensure_aware_datetime(status.created) + + # Allow 1 minute buffer for clock skew + buffer_time = self._build_triggered_at - timedelta(minutes=1) + return status_time >= buffer_time + + def delete_previous_branch(self, branch: str): + """Delete a branch from the fork.""" + try: + # Set up git repo if not already done (this ensures fork exists) + repo_dir = self._setup_git_repo() + + # Get fork remote name (should match deployment account) + fork_remote = self.account_name + + # Try to delete the branch via git push + logging.info("Attempting to delete branch %s from fork", branch) + try: + subprocess.run( # noqa: S603 + ["git", "push", fork_remote, f":{branch}"], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + text=True, + ) + logging.info("Deleted branch %s from fork", branch) + except subprocess.CalledProcessError as e: + logging.warning("Could not delete branch %s: %s\nstderr: %s", branch, e, e.stderr) + except Exception as e: + logging.warning("Error deleting branch %s: %s", branch, e) + + def update_file_and_commit(self, path: str, commit_msg: str, content: str, branch: str): + """Update a file and commit the changes using git operations.""" + repo_dir = self._setup_git_repo() + + # Checkout the branch + logging.info("Checking out branch: %s", branch) + subprocess.run( # noqa: S603 + ["git", "checkout", branch], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + ) + + # Update the file + file_path = Path(repo_dir) / path + file_path.write_text(content) + + # Add and commit + subprocess.run( # noqa: S603 + ["git", "add", path], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + ) + subprocess.run( # noqa: S603 + ["git", "commit", "-m", commit_msg], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + ) + logging.debug("Updated file %s and committed", path) + + # Push to fork (fedpkg fork creates remote named after deployment account) + fork_remote = self.account_name + logging.info("Pushing updated branch %s to fork", branch) + subprocess.run( # noqa: S603 + ["git", "push", "--force", fork_remote, branch], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + text=True, + ) + + def _get_packit_yaml_ref(self, branch: str) -> str: # noqa: ARG002 + """ + Override to read .packit.yaml from default branch. + The test branch only exists in the fork, not in the main repo. + """ + return self.project.default_branch + + def create_empty_commit(self, branch: str, commit_msg: str) -> str: + """Create an empty commit using git operations.""" + repo_dir = self._setup_git_repo() + + # Checkout the branch + logging.info("Checking out branch: %s", branch) + subprocess.run( # noqa: S603 + ["git", "checkout", branch], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + ) + + # Create empty commit + subprocess.run( # noqa: S603 + ["git", "commit", "--allow-empty", "-m", commit_msg], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + ) + + # Get commit SHA + result = subprocess.run( # noqa: S603 + ["git", "rev-parse", "HEAD"], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + text=True, + ) + commit_sha = result.stdout.strip() + logging.debug("Created empty commit: %s", commit_sha) + + # Push to fork (fedpkg fork creates remote named after deployment account) + fork_remote = self.account_name + logging.info("Pushing branch %s to fork", branch) + subprocess.run( # noqa: S603 + ["git", "push", "--force", fork_remote, branch], # noqa: S607 + cwd=repo_dir, + check=True, + capture_output=True, + text=True, + ) + + return commit_sha + + async def check_build_submitted(self): + """ + Check whether the Koji build task was submitted. + Overrides the base class Copr build check for Pagure projects. + Scratch builds don't appear in listBuilds(), so we query listTasks() instead. + """ + logging.info("Checking Koji build submission for Pagure PR") + old_comment_len = len(list(self.pr.get_comments())) if self.pr else 0 + + self._build_triggered_at = datetime.now(tz=timezone.utc) + self.trigger_build() + + watch_end = datetime.now(tz=timezone.utc) + timedelta( + minutes=self.CHECK_TIME_FOR_SUBMIT_BUILDS, + ) + + await self.check_pending_check_runs() + + logging.info( + "Watching for Koji build task submission for %s (timeout: %d minutes)", + self.pr, + self.CHECK_TIME_FOR_SUBMIT_BUILDS, + ) + + check_count = 0 + while True: + check_count += 1 + if check_count % 5 == 0: + logging.debug("Still waiting for Koji build submission (check #%d)...", check_count) + if datetime.now(tz=timezone.utc) > watch_end: + self.failure_msg += ( + "The Koji build was not submitted in time " + f"({self.CHECK_TIME_FOR_SUBMIT_BUILDS} minutes).\n" + ) + return + + # Query Koji for build tasks matching our commit + koji_task = self.get_koji_task_for_pr() + + if koji_task: + self._build = KojiBuildWrapper({"build_id": koji_task["id"], "id": koji_task["id"]}) + logging.info("Found Koji task: %s", koji_task["id"]) + return + + # Check for error comments from packit-service + new_comments = list(self.pr.get_comments(reverse=True)) + new_comments = new_comments[: (len(new_comments) - old_comment_len)] + + if new_comments: + packit_comments = [ + comment.body for comment in new_comments if comment.author == self.account_name + ] + if packit_comments: + comment_text = packit_comments[0] + self.failure_msg += ( + f"New comment from packit-service while submitting Koji build: " + f"{comment_text}\n" + ) + + await asyncio.sleep(120) + + async def check_build(self, build_id): + """ + Check whether the Koji task completed successfully. + Overrides the base class Copr build check for Pagure projects. + :param build_id: ID of the Koji task (for Pagure, this is a task ID not a build ID, + since scratch builds are tasks not builds) + :return: + """ + task_id = build_id # For Pagure, build_id is actually a task_id + watch_end = datetime.now(tz=timezone.utc) + timedelta(minutes=self.CHECK_TIME_FOR_BUILD) + state_reported = None + logging.info( + "Watching Koji task %s (timeout: %d minutes)", + task_id, + self.CHECK_TIME_FOR_BUILD, + ) + + koji_session = koji() + + while True: + if datetime.now(tz=timezone.utc) > watch_end: + self.failure_msg += ( + f"The Koji task did not finish in time ({self.CHECK_TIME_FOR_BUILD} minutes).\n" + ) + return + + task_info = koji_session.getTaskInfo(task_id) + task_state = task_info.get("state") + + # Koji task states: + # 0 = FREE + # 1 = OPEN (running) + # 2 = CLOSED (completed successfully) + # 3 = CANCELED + # 4 = ASSIGNED + # 5 = FAILED + + if task_state == state_reported: + await asyncio.sleep(self.POLLING_INTERVAL * 60) + continue + + state_reported = task_state + state_names = { + KOJI_TASK_FREE: "FREE", + KOJI_TASK_OPEN: "OPEN", + KOJI_TASK_COMPLETED: "CLOSED", + KOJI_TASK_CANCELED: "CANCELED", + KOJI_TASK_ASSIGNED: "ASSIGNED", + KOJI_TASK_FAILED: "FAILED", + } + logging.debug( + "Koji task %s state: %s", + task_id, + state_names.get(task_state, task_state), + ) + + if task_state == KOJI_TASK_COMPLETED: + # Task completed successfully + logging.info("Koji task %s completed successfully", task_id) + return + if task_state in [KOJI_TASK_CANCELED, KOJI_TASK_FAILED]: + # Task failed or canceled + logging.error( + "Koji task %s failed with state: %s", + task_id, + state_names.get(task_state, task_state), + ) + state = state_names.get(task_state, task_state) + self.failure_msg += f"The Koji task was not successful. Koji state: {state}.\n" + return + + await asyncio.sleep(self.POLLING_INTERVAL * 60) + + def get_package_name(self) -> str: + """ + Get the package name from the project (e.g., 'requre'). + """ + return self.project.repo + + def get_koji_task_for_pr(self) -> dict | None: + """ + Get Koji build task associated with this PR's commit. + Scratch builds are tasks, not builds, so we query listTasks() instead of listBuilds(). + We match tasks by commit hash in the source URL, not by package name. + """ + koji_session = koji() + + try: + # Query recent build tasks for this package + # Method 'build' is the standard build task type + tasks = koji_session.listTasks( + opts={ + "method": "build", + "decode": True, + "state": [ + KOJI_TASK_FREE, + KOJI_TASK_OPEN, + KOJI_TASK_COMPLETED, + KOJI_TASK_CANCELED, + KOJI_TASK_ASSIGNED, + ], + }, + queryOpts={"limit": 20, "order": "-id"}, + ) + + # Filter tasks that match our commit + for task in tasks: + if self.is_task_for_pr(task): + return task + + return None + except Exception as e: + logging.warning("Error fetching Koji tasks: %s", e) + return None + + def is_task_for_pr(self, task: dict) -> bool: + """ + Check if a Koji task is associated with this PR's commit. + The task request contains the git URL with the commit hash. + """ + try: + koji_session = koji() + # Get the task request which contains the source URL + request = koji_session.getTaskRequest(task["id"]) + + # Request format: [source_url, target, opts] + # Example: ['git+https://src.fedoraproject.org/forks/...', 'rawhide', {...}] + if request and len(request) > 0: + source_url = request[0] + # Check if our commit hash is in the source URL + # The commit hash uniquely identifies our build + if self.head_commit in source_url: + logging.debug("Task %s matches commit %s", task["id"], self.head_commit) + return True + except Exception as e: + logging.debug("Error checking task %s: %s", task.get("id"), e) + + return False diff --git a/src/validation/tests/base.py b/src/validation/tests/base.py index d9fa928..5efd732 100644 --- a/src/validation/tests/base.py +++ b/src/validation/tests/base.py @@ -8,90 +8,146 @@ from ogr.abstract import GitProject from validation.deployment import DEPLOYMENT +from validation.testcase.base import TestFailureError from validation.utils.trigger import Trigger class Tests: project: GitProject test_case_kls: type + # Minimum required API rate limit - can be overridden in subclasses + min_required_rate_limit: int = 100 + # Stagger delay in seconds between tests - can be overridden in subclasses + test_stagger_seconds: int = 0 + # Threshold for displaying delay in minutes vs seconds + SECONDS_PER_MINUTE: int = 60 - async def run(self): - loop = asyncio.get_event_loop() - tasks = set() + async def check_rate_limit(self) -> None: + """ + Check API rate limit before running tests. + If quota is insufficient, wait proportionally and retry. + """ + max_retries = 3 + retry_count = 0 - prs_for_comment = [ - pr for pr in self.project.get_pr_list() if pr.title.startswith("Test VM Image builds") - ] - if prs_for_comment: - msg = ( - "Run testcases where the build is triggered by a " - f"‹vm-image-build› comment for {self.project.service.instance_url}" - ) - else: - msg = ( - "No testcases found where the build is triggered by a " - f"‹vm-image-build› comment for {self.project.service.instance_url}" - ) - logging.warning(msg) - for pr in prs_for_comment: - task = loop.create_task( - self.test_case_kls( - project=self.project, - pr=pr, - trigger=Trigger.comment, - deployment=DEPLOYMENT, - comment=DEPLOYMENT.pr_comment_vm_image_build, - ).run_test(), - ) + while retry_count < max_retries: + try: + # Use OGR's built-in rate limit checking + remaining = self.project.service.get_rate_limit_remaining() - tasks.add(task) - task.add_done_callback(tasks.discard) + if remaining is None: + # Rate limit info not available (e.g., Pagure), skip the check + logging.debug( + "Rate limit information not available for %s, skipping check", + self.project.service.instance_url, + ) + return - prs_for_comment = [ - pr for pr in self.project.get_pr_list() if pr.title.startswith("Basic test case:") - ] - if prs_for_comment: - msg = ( - "Run testcases where the build is triggered by a " - f"‹build› comment for {self.project.service.instance_url}" - ) - else: - msg = ( - "No testcases found where the build is triggered by a " - f"‹build› comment for {self.project.service.instance_url}" - ) - logging.warning(msg) - for pr in prs_for_comment: - task = loop.create_task( + logging.info( + "API rate limit for %s: %d requests remaining", + self.project.service.instance_url, + remaining, + ) + + if remaining < self.min_required_rate_limit: + # Calculate deficit and wait proportionally + deficit = self.min_required_rate_limit - remaining + # Wait time: roughly 1 second per missing request, with a minimum of 60s + # and maximum of 3600s (1 hour) + wait_seconds = max(60, min(deficit, 3600)) + + retry_count += 1 + if retry_count >= max_retries: + logging.warning( + "Insufficient API quota for %s after %d retries: " + "%d remaining (need %d). Proceeding anyway.", + self.project.service.instance_url, + max_retries, + remaining, + self.min_required_rate_limit, + ) + return + + from datetime import datetime, timedelta, timezone + + resume_time = datetime.now(tz=timezone.utc) + timedelta(seconds=wait_seconds) + logging.warning( + "Insufficient API quota for %s: %d remaining (need %d). " + "Waiting %d seconds until %s (retry %d/%d)", + self.project.service.instance_url, + remaining, + self.min_required_rate_limit, + wait_seconds, + resume_time.strftime("%H:%M:%S UTC"), + retry_count, + max_retries, + ) + await asyncio.sleep(wait_seconds) + logging.info("Retrying rate limit check...") + continue # Retry the check + + # Sufficient quota, proceed + return + + except Exception as e: + # Log but don't fail on errors + logging.warning( + "Could not check rate limit for %s: %s. Proceeding anyway.", + self.project.service.instance_url, + e, + ) + return + + async def run(self): + # Check rate limit before starting tests + await self.check_rate_limit() + logging.info("Starting validation tests for %s", self.project.service.instance_url) + logging.debug("Fetching PR list from %s/%s", self.project.namespace, self.project.repo) + tasks = [] + test_metadata = [] # Track test details for summary + + # Fetch PR list once and cache it + all_prs = list(self.project.get_pr_list()) + + # Run non-comment tests first (these don't trigger abuse detection) + # 1. New PR test (creates PR via API, no comment) + msg = ( + "Run testcase where the build is triggered by opening " + f"a new PR {self.project.service.instance_url}" + ) + logging.info(msg) + try: + tasks.append( self.test_case_kls( project=self.project, - pr=pr, - trigger=Trigger.comment, deployment=DEPLOYMENT, + existing_prs=all_prs, ).run_test(), ) + test_metadata.append( + { + "type": "new_pr", + "pr_url": None, # Will be created during test + "pr_title": "New PR test", + "trigger": "pr_opened", + }, + ) + except Exception as e: + logging.exception("Failed to create test task: %s", e) + raise - tasks.add(task) - task.add_done_callback(tasks.discard) - + # 2. Push trigger test (pushes to PR, no comment) pr_for_push = [ - pr - for pr in self.project.get_pr_list() - if pr.title.startswith(DEPLOYMENT.push_trigger_tests_prefix) + pr for pr in all_prs if pr.title.startswith(DEPLOYMENT.push_trigger_tests_prefix) ] + logging.debug("Found %d push trigger PRs", len(pr_for_push)) if pr_for_push: msg = ( "Run testcase where the build is triggered by push " f"for {self.project.service.instance_url}" ) - else: - msg = ( - "No testcase found where the build is triggered by push " - f"for {self.project.service.instance_url}" - ) - logging.warning(msg) - if pr_for_push: - task = loop.create_task( + logging.warning(msg) + tasks.append( self.test_case_kls( project=self.project, pr=pr_for_push[0], @@ -99,18 +155,153 @@ async def run(self): deployment=DEPLOYMENT, ).run_test(), ) + test_metadata.append( + { + "type": "push", + "pr_url": pr_for_push[0].url, + "pr_title": pr_for_push[0].title, + "trigger": "push", + }, + ) + else: + msg = ( + "No testcase found where the build is triggered by push " + f"for {self.project.service.instance_url}" + ) + logging.warning(msg) - tasks.add(task) - task.add_done_callback(tasks.discard) + # 3. Comment-based tests + basic_prs = [pr for pr in all_prs if pr.title.startswith("Basic test case:")] - msg = ( - "Run testcase where the build is triggered by opening " - f"a new PR {self.project.service.instance_url}" + # Combine all comment-based test PRs + all_comment_prs = [(pr, None) for pr in basic_prs] + + logging.debug( + "Found %d basic test case PRs", + len(basic_prs), ) - logging.info(msg) - task = loop.create_task( - self.test_case_kls(project=self.project, deployment=DEPLOYMENT).run_test(), + if all_comment_prs: + logging.info( + "Running %d comment-based tests for %s", + len(all_comment_prs), + self.project.service.instance_url, + ) + + for pr, comment in all_comment_prs: + tasks.append( + self.test_case_kls( + project=self.project, + pr=pr, + trigger=Trigger.comment, + deployment=DEPLOYMENT, + comment=comment, + ).run_test(), + ) + test_metadata.append( + { + "type": "comment", + "pr_url": pr.url, + "pr_title": pr.title, + "trigger": "comment", + }, + ) + else: + logging.warning( + "No comment-based test PRs found for %s", + self.project.service.instance_url, + ) + + logging.info( + "Created %d test tasks for %s", + len(tasks), + self.project.service.instance_url, ) - tasks.add(task) - task.add_done_callback(tasks.discard) + + # Run tests with staggered starts to avoid API rate limiting + # Stagger delay is configurable per service (test_stagger_seconds) + async def run_with_delay(task, delay): + if delay > 0: + if delay >= self.SECONDS_PER_MINUTE: + minutes = delay // self.SECONDS_PER_MINUTE + logging.info("Waiting %d minutes before starting next test...", minutes) + else: + logging.info("Waiting %d seconds before starting next test...", delay) + await asyncio.sleep(delay) + return await task + + staggered_tasks = [ + run_with_delay(task, i * self.test_stagger_seconds) for i, task in enumerate(tasks) + ] + + # Wait for all tasks to complete + results = await asyncio.gather(*staggered_tasks, return_exceptions=True) + + # Count successful and failed tests + passed = sum(1 for r in results if r is True) + failed = sum(1 for r in results if r is False or isinstance(r, Exception)) + total = len(results) + + # Collect failed test details + failed_tests = [] + for i, result in enumerate(results): + if result is False or isinstance(result, Exception): + metadata = test_metadata[i] if i < len(test_metadata) else {} + pr_url = metadata.get("pr_url", "Unknown") + pr_title = metadata.get("pr_title", "Unknown") + trigger = metadata.get("trigger", "unknown") + + if isinstance(result, TestFailureError): + # TestFailureError contains the actual failure message + reason = str(result) + elif isinstance(result, Exception): + reason = f"Exception: {result!s}" + else: + reason = "Test returned False (check logs for details)" + + failed_tests.append( + { + "pr_url": pr_url, + "pr_title": pr_title, + "trigger": trigger, + "reason": reason, + }, + ) + + # Log summary at ERROR level if there are failures, otherwise INFO level + separator = "=" * 60 + log_level = logging.ERROR if failed > 0 else logging.INFO + + summary_lines = [ + separator, + f"Test Summary for {self.project.service.instance_url}:", + f" Total: {total}", + f" Passed: {passed}", + f" Failed: {failed}", + ] + + # Add failed test details if there are any failures + if failed_tests: + summary_lines.append("") + summary_lines.append("Failed Tests:") + for idx, failed_test in enumerate(failed_tests, 1): + summary_lines.append(f" {idx}. {failed_test['pr_title']}") + if failed_test["pr_url"]: + summary_lines.append(f" URL: {failed_test['pr_url']}") + summary_lines.append(f" Trigger: {failed_test['trigger']}") + summary_lines.append(f" Reason: {failed_test['reason']}") + + summary_lines.append(separator) + + logging.log(log_level, "\n".join(summary_lines)) + + # Log detailed exceptions separately + for i, result in enumerate(results): + if isinstance(result, Exception): + metadata = test_metadata[i] if i < len(test_metadata) else {} + pr_info = metadata.get("pr_url", f"Task {i}") + logging.error( + "Detailed traceback for %s:", + pr_info, + exc_info=result, + ) diff --git a/src/validation/tests/github.py b/src/validation/tests/github.py index d33d074..ef8b0ec 100644 --- a/src/validation/tests/github.py +++ b/src/validation/tests/github.py @@ -12,6 +12,11 @@ class GithubTests(Tests): test_case_kls = GithubTestcase + # We need at least 100 requests per test, and we run multiple comment tests + # So require at least sufficient quota + min_required_rate_limit = 1200 + # Space out tests to avoid hitting rate limits + test_stagger_seconds = 60 def __init__(self): github_service = GithubService(token=getenv("GITHUB_TOKEN")) diff --git a/src/validation/tests/gitlab.py b/src/validation/tests/gitlab.py index 8317ad1..7f39aaa 100644 --- a/src/validation/tests/gitlab.py +++ b/src/validation/tests/gitlab.py @@ -13,6 +13,13 @@ class GitlabTests(Tests): test_case_kls = GitlabTestcase + # We need at least 50 requests per test, and we run multiple tests + # (new PR, push trigger, comment tests) + # GitLab has more generous limits than GitHub (2000/min vs 5000/hour) + min_required_rate_limit = 250 + # Stagger tests by 60 seconds to avoid race conditions in packit-service + # when multiple events arrive simultaneously for the same project + test_stagger_seconds = 60 def __init__( self, diff --git a/src/validation/tests/pagure.py b/src/validation/tests/pagure.py new file mode 100644 index 0000000..1b4ac2c --- /dev/null +++ b/src/validation/tests/pagure.py @@ -0,0 +1,54 @@ +# SPDX-FileCopyrightText: 2023-present Contributors to the Packit Project. +# +# SPDX-License-Identifier: MIT + +import logging +from os import getenv + +from ogr import PagureService +from ogr.services.pagure import PagureProject + +from validation.helpers import KerberosError, destroy_kerberos_ticket, init_kerberos_ticket +from validation.testcase.pagure import PagureTestcase +from validation.tests.base import Tests + + +class PagureTests(Tests): + test_case_kls = PagureTestcase + + def __init__( + self, + instance_url="https://src.fedoraproject.org/", + namespace="rpms", + token_name="PAGURE_TOKEN", + ): + pagure_service = PagureService(token=getenv(token_name), instance_url=instance_url) + self.project: PagureProject = pagure_service.get_project( + repo="python-requre", + namespace=namespace, + ) + self._kerberos_principal = None + + async def run(self): + """Override run to initialize Kerberos ticket before tests.""" + keytab_file = getenv("PAGURE_KEYTAB") + + if keytab_file: + try: + self._kerberos_principal = await init_kerberos_ticket(keytab_file) + logging.info("Kerberos ticket initialized for Pagure tests") + except KerberosError as e: + logging.error("Failed to initialize Kerberos ticket: %s", e) + logging.warning("Continuing without Kerberos ticket - some operations may fail") + + try: + # Run the actual tests + await super().run() + finally: + # Clean up Kerberos ticket + if self._kerberos_principal: + try: + await destroy_kerberos_ticket(self._kerberos_principal) + logging.info("Kerberos ticket destroyed") + except Exception as e: + logging.warning("Failed to destroy Kerberos ticket: %s", e)