diff --git a/babs/base.py b/babs/base.py index 644018a3..3bec1186 100644 --- a/babs/base.py +++ b/babs/base.py @@ -3,6 +3,7 @@ import os import os.path as op import subprocess +from dataclasses import replace from pathlib import Path from urllib.parse import urlparse @@ -12,6 +13,13 @@ from babs.input_datasets import InputDatasets, OutputDatasets from babs.scheduler import ( request_all_job_status, + run_squeue, +) +from babs.status import ( + read_job_status_csv, + update_from_branches, + update_from_scheduler, + write_job_status_csv, ) from babs.system import validate_queue from babs.utils import ( @@ -20,12 +28,9 @@ get_results_branches, identify_running_jobs, read_yaml, - results_branch_dataframe, results_status_columns, scheduler_status_columns, status_dtypes, - update_job_batch_status, - update_results_status, validate_processing_level, ) @@ -381,32 +386,50 @@ def _get_results_branches(self) -> list[str]: """Get the results branch names from the output RIA in a list.""" return get_results_branches(self.output_ria_data_dir) - def _update_results_status(self) -> None: - """ - Update the status of jobs based on results in the output RIA and zip files. - """ - - previous_job_completion_df = self.get_job_status_df() - - # Step 1: get a list of branches in the output ria to update the status - list_branches = self._get_results_branches() - completed_branches_df = results_branch_dataframe(list_branches, self.processing_level) + def _update_results_status(self) -> dict: + """Update job statuses from external sources and write to CSV. - # Get any completed merged zip files - merged_zip_completion_df = self._get_merged_results_from_analysis_dir() - - # Update the results status - current_status_df = update_results_status( - previous_job_completion_df, completed_branches_df, merged_zip_completion_df - ) - - # Part 2: Update which jobs are running - currently_running_df = self.get_currently_running_jobs_df() - current_status_df = update_job_batch_status(current_status_df, currently_running_df) - current_status_df['has_results'] = ( - current_status_df['has_results'].astype('boolean').fillna(False) + Returns + ------- + dict[tuple, JobStatus] + Updated statuses keyed by (sub_id,) or (sub_id, ses_id). + """ + # Read current state from CSV + if op.exists(self.job_status_path_abs): + statuses = read_job_status_csv(self.job_status_path_abs) + else: + statuses = {} + + # Update from results branches in output RIA + branches = self._get_results_branches() + statuses = update_from_branches(statuses, branches) + + # Update from merged zip files in analysis dir + merged_zip_df = self._get_merged_results_from_analysis_dir() + if not merged_zip_df.empty: + for _, row in merged_zip_df.iterrows(): + ses_id = row.get('ses_id') if 'ses_id' in merged_zip_df.columns else None + key = (row['sub_id'], ses_id) if ses_id else (row['sub_id'],) + if key in statuses: + statuses[key] = replace(statuses[key], has_results=True) + + # Update from scheduler (squeue) + job_ids = sorted( + { + job.job_id + for job in statuses.values() + if job.submitted and not job.has_results and job.job_id is not None + } ) - current_status_df.to_csv(self.job_status_path_abs, index=False) + raw_squeue_parts = [] + for jid in job_ids: + raw_squeue_parts.append(run_squeue(self.queue, jid)) + raw_squeue = ''.join(raw_squeue_parts) + statuses = update_from_scheduler(statuses, raw_squeue) + + # Write updated statuses + write_job_status_csv(self.job_status_path_abs, statuses) + return statuses def get_latest_submitted_jobs_df(self): """ diff --git a/babs/bootstrap.py b/babs/bootstrap.py index d8af8d09..696bc9c3 100644 --- a/babs/bootstrap.py +++ b/babs/bootstrap.py @@ -1,5 +1,6 @@ """This is the main module.""" +import csv import os import os.path as op import subprocess @@ -7,19 +8,16 @@ from pathlib import Path import datalad.api as dlapi -import pandas as pd import yaml from jinja2 import Environment, PackageLoader, StrictUndefined from babs.base import BABS from babs.container import Container from babs.input_datasets import InputDatasets +from babs.status import create_initial_statuses, write_job_status_csv from babs.system import System, validate_queue from babs.utils import ( get_datalad_version, - results_status_columns, - results_status_default_values, - status_dtypes, validate_processing_level, ) @@ -611,22 +609,12 @@ def clean_up(self): print('\nCreated BABS project has been cleaned up.') def _create_initial_job_status_csv(self): - """ - Create the initial job status csv file. - """ + """Create the initial job status csv file.""" if op.exists(self.job_status_path_abs): return - # Load the complete list of subjects and optionally sessions - df_sub = pd.read_csv(self.list_sub_path_abs) - df_job = df_sub.copy() - - # Fill the columns that should get default values - for column_name, default_value in results_status_default_values.items(): - df_job[column_name] = default_value - - # ensure dtypes for all the columns - for column_name in results_status_columns: - df_job[column_name] = df_job[column_name].astype(status_dtypes[column_name]) + with open(self.list_sub_path_abs, newline='') as f: + sub_ses_list = list(csv.DictReader(f)) - df_job.to_csv(self.job_status_path_abs, index=False) + statuses = create_initial_statuses(sub_ses_list) + write_job_status_csv(self.job_status_path_abs, statuses) diff --git a/babs/interaction.py b/babs/interaction.py index b6a779cb..1e16b403 100644 --- a/babs/interaction.py +++ b/babs/interaction.py @@ -144,7 +144,5 @@ def babs_status(self): """ Check job status and makes a nice report. """ - self._update_results_status() - currently_running_df = self.get_currently_running_jobs_df() - current_results_df = self.get_job_status_df() - report_job_status(current_results_df, currently_running_df, self.analysis_path) + statuses = self._update_results_status() + report_job_status(statuses, self.analysis_path) diff --git a/babs/scheduler.py b/babs/scheduler.py index c0650942..69677ed7 100644 --- a/babs/scheduler.py +++ b/babs/scheduler.py @@ -6,9 +6,53 @@ import pandas as pd import yaml +from babs.status import SchedulerState from babs.utils import get_username, scheduler_status_columns, status_dtypes +def run_squeue(queue, job_id: int) -> str: + """Run squeue and return raw pipe-delimited output. + + Parameters + ---------- + queue : str + Job scheduling system type (only 'slurm' supported). + job_id : int + The job array ID to query. + + Returns + ------- + str + Raw squeue stdout (pipe-delimited lines), or empty string if + no jobs found. + """ + if queue != 'slurm': + raise NotImplementedError(f'Queue {queue!r} is not supported.') + if not check_slurm_available(): + raise RuntimeError('Slurm commands are not available on this system.') + + username = get_username() + cmd = [ + 'squeue', + '-u', + username, + '-r', + '--noheader', + '--format=%i|%t|%M|%l|%D|%C|%P|%j', + f'-j{job_id}', + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 1 and 'Invalid job id specified' in result.stderr: + return '' + if result.returncode != 0: + raise RuntimeError( + f'squeue failed with return code {result.returncode}\nstderr: {result.stderr}' + ) + return result.stdout + + def check_slurm_available() -> bool: """Check if Slurm commands are available on the system. @@ -264,21 +308,16 @@ def submit_one_test_job(analysis_path, queue): return job_id -def report_job_status(current_results_df, currently_running_df, analysis_path): +def report_job_status(statuses, analysis_path): """ Print a report that summarizes the overall status of a BABS project. - This will show how many of the jobs have been completed, - how many are still running, and how many have failed. - - Parameters: - ------------- - current_results_df: pd.DataFrame - dataframe the accurately reflects which tasks have finished - currently_running_df: pd.DataFrame - dataframe of currently running tasks - analysis_path: str - path to the `analysis` folder of a `BABS` project + Parameters + ---------- + statuses : dict[tuple, JobStatus] + Current job statuses keyed by (sub_id,) or (sub_id, ses_id). + analysis_path : str + Path to the ``analysis`` folder of a BABS project. """ from jinja2 import Environment, PackageLoader, StrictUndefined @@ -291,12 +330,13 @@ def report_job_status(current_results_df, currently_running_df, analysis_path): ) template = env.get_template('job_status_report.jinja') - total_jobs = current_results_df.shape[0] - total_submitted = int(current_results_df['submitted'].sum()) - total_is_done = int(current_results_df['has_results'].sum()) - total_pending = int((currently_running_df['state'] == 'PD').sum()) - total_running = int((currently_running_df['state'] == 'R').sum()) - total_failed = int(current_results_df['is_failed'].sum()) + jobs = list(statuses.values()) + total_jobs = len(jobs) + total_submitted = sum(1 for j in jobs if j.submitted) + total_is_done = sum(1 for j in jobs if j.has_results) + total_pending = sum(1 for j in jobs if j.scheduler_state == SchedulerState.PENDING) + total_running = sum(1 for j in jobs if j.scheduler_state == SchedulerState.RUNNING) + total_failed = sum(1 for j in jobs if j.is_failed) print( template.render( diff --git a/babs/status.py b/babs/status.py new file mode 100644 index 00000000..ec505281 --- /dev/null +++ b/babs/status.py @@ -0,0 +1,356 @@ +"""Job status data model and CSV I/O for babs status.""" + +import csv +import os +import re +from dataclasses import dataclass, replace +from enum import Enum + + +class SchedulerState(Enum): + """States a job can be in relative to the scheduler. + + PENDING through CONFIGURING use SLURM squeue state codes as values + for backward compatibility with the existing job_status.csv format. + + TODO: decouple SLURM state codes from the BABS data model so the + CSV stores scheduler-agnostic values instead of SLURM strings. + """ + + NOT_SUBMITTED = 'NOT_SUBMITTED' + PENDING = 'PD' # SLURM squeue state + RUNNING = 'R' # SLURM squeue state + COMPLETING = 'CG' # SLURM squeue state + CONFIGURING = 'CF' # SLURM squeue state + DONE = 'DONE' + # future: COMPLETED, FAILED, CANCELLED, TIMEOUT (from sacct) + + @classmethod + def from_slurm_state(cls, state_str: str) -> 'SchedulerState': + """Convert a SLURM squeue state string to a SchedulerState.""" + for member in cls: + if member.value == state_str: + return member + raise ValueError(f'Unknown SLURM scheduler state: {state_str!r}') + + +@dataclass +class JobStatus: + """Status of a single job (one subject, optionally one session).""" + + sub_id: str + ses_id: str | None + scheduler_state: SchedulerState + has_results: bool + job_id: int | None + task_id: int | None + time_used: str + time_limit: str + nodes: int + cpus: int + partition: str + name: str + + @property + def is_failed(self) -> bool: + return self.scheduler_state == SchedulerState.DONE and not self.has_results + + @property + def submitted(self) -> bool: + return self.scheduler_state != SchedulerState.NOT_SUBMITTED + + @property + def key(self) -> tuple: + if self.ses_id is not None: + return (self.sub_id, self.ses_id) + return (self.sub_id,) + + +# -- CSV I/O ----------------------------------------------------------------- + +_CSV_COLUMNS_SUBJECT = [ + 'sub_id', + 'submitted', + 'is_failed', + 'state', + 'time_used', + 'time_limit', + 'nodes', + 'cpus', + 'partition', + 'name', + 'job_id', + 'task_id', + 'has_results', +] + +_CSV_COLUMNS_SESSION = [ + 'sub_id', + 'ses_id', + 'submitted', + 'is_failed', + 'state', + 'time_used', + 'time_limit', + 'nodes', + 'cpus', + 'partition', + 'name', + 'job_id', + 'task_id', + 'has_results', +] + +_STATE_TO_CSV = { + SchedulerState.NOT_SUBMITTED: '', + SchedulerState.PENDING: 'PD', + SchedulerState.RUNNING: 'R', + SchedulerState.COMPLETING: 'CG', + SchedulerState.CONFIGURING: 'CF', + SchedulerState.DONE: '', +} + +_SLURM_SQUEUE_STATES = {'PD', 'R', 'CG', 'CF'} + + +def _parse_bool(value: str) -> bool: + return value.strip().lower() == 'true' + + +def _parse_optional_int(value: str) -> int | None: + value = value.strip() + if value in ('', 'NA', 'nan'): + return None + return int(float(value)) + + +def _job_status_from_row(row: dict) -> JobStatus: + """Construct a JobStatus from a CSV row dict. + + ``is_failed`` and ``submitted`` columns are ignored on read — + they are derived properties on JobStatus. + """ + ses_id = row.get('ses_id', '').strip() or None + state_str = row.get('state', '').strip() + submitted = _parse_bool(row.get('submitted', 'False')) + if state_str in _SLURM_SQUEUE_STATES: + scheduler_state = SchedulerState.from_slurm_state(state_str) + elif submitted: + scheduler_state = SchedulerState.DONE + else: + scheduler_state = SchedulerState.NOT_SUBMITTED + + return JobStatus( + sub_id=row['sub_id'].strip(), + ses_id=ses_id, + scheduler_state=scheduler_state, + has_results=_parse_bool(row.get('has_results', 'False')), + job_id=_parse_optional_int(row.get('job_id', '')), + task_id=_parse_optional_int(row.get('task_id', '')), + time_used=row.get('time_used', '').strip(), + time_limit=row.get('time_limit', '').strip(), + nodes=int(float(row.get('nodes', '0').strip() or '0')), + cpus=int(float(row.get('cpus', '0').strip() or '0')), + partition=row.get('partition', '').strip(), + name=row.get('name', '').strip(), + ) + + +def read_job_status_csv(path: str) -> dict[tuple, JobStatus]: + """Read job_status.csv and return a dict keyed by (sub_id,) or (sub_id, ses_id).""" + statuses: dict[tuple, JobStatus] = {} + with open(path, newline='') as f: + reader = csv.DictReader(f) + for row in reader: + job = _job_status_from_row(row) + statuses[job.key] = job + return statuses + + +def _job_status_to_row(job: JobStatus, session_level: bool) -> dict: + """Convert a JobStatus to a CSV row dict.""" + row = { + 'sub_id': job.sub_id, + 'submitted': str(job.submitted), + 'is_failed': str(job.is_failed), + 'state': _STATE_TO_CSV[job.scheduler_state], + 'time_used': job.time_used, + 'time_limit': job.time_limit, + 'nodes': str(job.nodes), + 'cpus': str(job.cpus), + 'partition': job.partition, + 'name': job.name, + 'job_id': str(job.job_id) if job.job_id is not None else '', + 'task_id': str(job.task_id) if job.task_id is not None else '', + 'has_results': str(job.has_results), + } + if session_level: + row['ses_id'] = job.ses_id or '' + return row + + +def write_job_status_csv(path: str, statuses: dict[tuple, JobStatus]) -> None: + """Write job statuses to job_status.csv.""" + if not statuses: + # Keep on-disk state aligned with in-memory status: + # if there are no tracked jobs, remove any stale CSV. + if os.path.exists(path): + os.remove(path) + return + + session_level = any(job.ses_id is not None for job in statuses.values()) + columns = _CSV_COLUMNS_SESSION if session_level else _CSV_COLUMNS_SUBJECT + + with open(path, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=columns) + writer.writeheader() + for job in statuses.values(): + writer.writerow(_job_status_to_row(job, session_level)) + + +# -- Update functions --------------------------------------------------------- + +# Branch name pattern: job---[-] +_BRANCH_PATTERN = re.compile( + r'job-(?P\d+)-?(?P\d+)?[-]' + r'(?Psub-[^-]+)(?:-(?Pses-[^-]+))?' +) + + +def update_from_branches( + statuses: dict[tuple, JobStatus], + branches: list[str], +) -> dict[tuple, JobStatus]: + """Update statuses with results information from branch names. + + Branches that don't match any existing status key are ignored + (they may belong to subjects not in the inclusion list). + """ + # Parse branches into a lookup of key -> (job_id, task_id) + branch_results: dict[tuple, tuple[int | None, int | None]] = {} + for branch in branches: + match = _BRANCH_PATTERN.match(branch) + if not match: + continue + sub_id = match.group('sub_id') + ses_id = match.group('ses_id') + job_id = int(match.group('job_id')) if match.group('job_id') else None + task_id = int(match.group('task_id')) if match.group('task_id') else None + key = (sub_id, ses_id) if ses_id else (sub_id,) + branch_results[key] = (job_id, task_id) + + updated = {} + for key, job in statuses.items(): + if key in branch_results: + branch_job_id, branch_task_id = branch_results[key] + updated[key] = replace( + job, + has_results=True, + job_id=branch_job_id if branch_job_id is not None else job.job_id, + task_id=branch_task_id if branch_task_id is not None else job.task_id, + ) + else: + updated[key] = job + return updated + + +def update_from_scheduler( + statuses: dict[tuple, JobStatus], + raw_squeue: str, +) -> dict[tuple, JobStatus]: + """Update statuses with live scheduler information from raw squeue output. + + Parses squeue output (pipe-delimited: job_id|state|time|limit|nodes|cpus|partition|name), + joins with existing statuses via (job_id, task_id), and updates scheduler fields. + + Jobs that were previously RUNNING/PENDING but are no longer in squeue + transition to DONE. + """ + # Parse squeue output into a lookup of (job_id, task_id) -> fields + squeue_by_id: dict[tuple[int, int], dict] = {} + for line in raw_squeue.strip().splitlines(): + if not line.strip(): + continue + parts = line.strip().split('|') + if len(parts) != 8: + continue + raw_job_id = parts[0] # format: array_id_task_id + id_parts = raw_job_id.split('_') + if len(id_parts) != 2: + raise ValueError(f'Expected array job format "jobid_taskid", got {raw_job_id!r}') + job_id = int(id_parts[0]) + task_id = int(id_parts[1]) + squeue_by_id[(job_id, task_id)] = { + 'state': parts[1], + 'time_used': parts[2], + 'time_limit': parts[3], + 'nodes': int(parts[4]), + 'cpus': int(parts[5]), + 'partition': parts[6], + 'name': parts[7], + } + + # Build reverse lookup: key -> (job_id, task_id) for matching + key_to_ids: dict[tuple, tuple[int, int]] = {} + for key, job in statuses.items(): + if job.job_id is not None and job.task_id is not None: + key_to_ids[key] = (job.job_id, job.task_id) + + updated = {} + for key, job in statuses.items(): + ids = key_to_ids.get(key) + squeue_info = squeue_by_id.get(ids) if ids else None + + if squeue_info is not None: + # Job is in the scheduler + updated[key] = replace( + job, + scheduler_state=SchedulerState.from_slurm_state(squeue_info['state']), + time_used=squeue_info['time_used'], + time_limit=squeue_info['time_limit'], + nodes=squeue_info['nodes'], + cpus=squeue_info['cpus'], + partition=squeue_info['partition'], + name=squeue_info['name'], + ) + elif job.submitted: + # Was submitted, not in scheduler anymore -> DONE + updated[key] = replace(job, scheduler_state=SchedulerState.DONE) + else: + updated[key] = job + return updated + + +# -- Initialization ----------------------------------------------------------- + + +def create_initial_statuses( + sub_ses_list: list[dict], +) -> dict[tuple, JobStatus]: + """Create initial JobStatus entries from a list of subject/session dicts. + + Parameters + ---------- + sub_ses_list : list of dict + Each dict has 'sub_id' and optionally 'ses_id'. + """ + statuses: dict[tuple, JobStatus] = {} + for entry in sub_ses_list: + sub_id = entry['sub_id'] + ses_id = entry.get('ses_id') + job = JobStatus( + sub_id=sub_id, + ses_id=ses_id, + scheduler_state=SchedulerState.NOT_SUBMITTED, + has_results=False, + job_id=None, + task_id=None, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ) + statuses[job.key] = job + return statuses diff --git a/babs/update.py b/babs/update.py index e47cc01d..3b5974be 100644 --- a/babs/update.py +++ b/babs/update.py @@ -1,9 +1,17 @@ """This is the main module.""" +import os.path as op + import datalad.api as dlapi import pandas as pd from babs.base import BABS +from babs.status import ( + JobStatus, + SchedulerState, + read_job_status_csv, + write_job_status_csv, +) EMPTY_JOB_STATUS_DF = pd.DataFrame( columns=['sub_id', 'ses_id', 'task_id', 'job_id', 'has_results'] @@ -99,31 +107,37 @@ def babs_update_input_data( def _update_job_status_with_new_inclusion( self, added_rows: pd.DataFrame, removed_rows: pd.DataFrame ): - """ - Update the job status dataframe with the new inclusion dataframe. - """ - # Get the job status dataframe - job_status_df = self.get_job_status_df() - updated_job_status_df = job_status_df.copy() + """Update the job status with added/removed subjects or sessions.""" + if not op.exists(self.job_status_path_abs): + statuses = {} + else: + statuses = read_job_status_csv(self.job_status_path_abs) + if not removed_rows.empty: - # Use an anti-join to remove matching rows - updated_job_status_df = ( - job_status_df.merge(removed_rows, how='left', indicator=True) - .query('_merge == "left_only"') - .drop('_merge', axis=1) - ) + for _, row in removed_rows.iterrows(): + ses_id = row.get('ses_id') if 'ses_id' in removed_rows.columns else None + key = (row['sub_id'], ses_id) if ses_id else (row['sub_id'],) + statuses.pop(key, None) if not added_rows.empty: - # Update the job status dataframe with the new inclusion dataframe - updated_job_status_df = pd.concat( - [job_status_df, added_rows], axis=0, ignore_index=True - ) - - # Ensure the has_results column is a boolean - for column in ['has_results', 'submitted']: - updated_job_status_df[column] = ( - updated_job_status_df[column].astype('boolean').fillna(False) - ) - - # Save the job status dataframe - updated_job_status_df.to_csv(self.job_status_path_abs, index=False) + for _, row in added_rows.iterrows(): + ses_id = row.get('ses_id') if 'ses_id' in added_rows.columns else None + sub_id = row['sub_id'] + key = (sub_id, ses_id) if ses_id else (sub_id,) + if key not in statuses: + statuses[key] = JobStatus( + sub_id=sub_id, + ses_id=ses_id, + scheduler_state=SchedulerState.NOT_SUBMITTED, + has_results=False, + job_id=None, + task_id=None, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ) + + write_job_status_csv(self.job_status_path_abs, statuses) diff --git a/babs/utils.py b/babs/utils.py index 71a245f7..b4fc01ab 100644 --- a/babs/utils.py +++ b/babs/utils.py @@ -58,21 +58,6 @@ def get_latest_submitted_jobs_columns(processing_level): 'is_failed', ] + scheduler_status_columns -results_status_default_values = { - 'submitted': False, - 'has_results': False, - 'is_failed': False, - 'job_id': -1, - 'task_id': -1, - 'state': '', - 'time_used': '', - 'time_limit': '', - 'nodes': 0, - 'cpus': 0, - 'partition': '', - 'name': '', -} - def get_datalad_version(): return version('datalad') @@ -453,73 +438,6 @@ def get_results_branches_from_ria(ria_data_dir, timeout=30): return branches -def results_branch_dataframe(branches, processing_level) -> pd.DataFrame: - """ - Create a dataframe from a list of branches. - - Parameters: - -------------- - branches: list - list of branches - processing_level: str - processing level - - Returns: - ------------- - df: pd.DataFrame - dataframe with the following columns: - job_id: int - task_id: int - sub_id: str - ses_id: str - has_results: bool - - Examples: - --------- - For sessionwise processing, the returned dataframe will look like: - job_id task_id sub_id ses_id has_results - 123 1 sub-0000 ses-0000 True - 123 2 sub-0001 ses-0001 True - - for subjectwise processing, the returned dataframe will look like: - - job_id task_id sub_id has_results - 123 1 sub-0000 True - 123 2 sub-0001 True - - """ - import re - - # Create a pattern with named groups - ses_id is optional - pattern = ( - r'job-(?P\d+)-?(?P\d+)?[-]' - r'(?Psub-[^-]+)(?:-(?Pses-[^-]+))?' - ) - - result_data = [] - for branch in branches: - match = re.match(pattern, branch) - if match: - # Convert match to dictionary and add has_results - result = match.groupdict() - result['has_results'] = True - result_data.append(result) - - df = pd.DataFrame(result_data) - if processing_level == 'session': - columns = ['job_id', 'task_id', 'sub_id', 'ses_id', 'has_results'] - else: - columns = ['job_id', 'task_id', 'sub_id', 'has_results'] - - if df.empty: - return pd.DataFrame(columns=columns) - - for column_name in columns: - df[column_name] = df[column_name].astype(status_dtypes[column_name]) - - return df[columns] - - def identify_running_jobs(last_submitted_jobs_df, currently_running_df): """ The currently-running jobs do not have the subject/session information. @@ -549,115 +467,6 @@ def identify_running_jobs(last_submitted_jobs_df, currently_running_df): ) -def update_results_status( - previous_job_completion_df, job_completion_df, merged_zip_completion_df=None -): - """ - Update a status dataframe with a results branch dataframe. - - Parameters: - -------------- - previous_job_completion_df: pd.DataFrame - previous job completion dataframe - job_completion_df: pd.DataFrame - job completion dataframe from results_branch_dataframe() - merged_zip_completion_df: pd.DataFrame or None - job completion dataframe from BABS._get_merged_results_from_analysis_dir() - - Returns: - ------------- - df: pd.DataFrame - updated job status dataframe - - """ - # Determine if we should use ses_id for merging - # Check previous_df and both completion dataframes - use_sesid = 'ses_id' in previous_job_completion_df - if use_sesid: - # Check if either completion dataframe has ses_id - # If job_completion_df is empty, check merged_zip_completion_df to determine columns - has_sesid_in_job = not job_completion_df.empty and 'ses_id' in job_completion_df - has_sesid_in_merged = ( - merged_zip_completion_df is not None - and not merged_zip_completion_df.empty - and 'ses_id' in merged_zip_completion_df - ) - # If previous_df has ses_id but neither completion df has it, don't use ses_id for merge - if not (has_sesid_in_job or has_sesid_in_merged): - use_sesid = False - - merge_on = ['sub_id', 'ses_id'] if use_sesid else ['sub_id'] - - # If we have a merged zip completion dataframe, - # we need to concatenate it with the job completion dataframe - if merged_zip_completion_df is not None and not merged_zip_completion_df.empty: - merged_df = merged_zip_completion_df.copy() - merged_df['has_results'] = True - merged_df['task_id'] = pd.NA - merged_df['job_id'] = pd.NA - - # Ensure both dataframes have the same columns before concatenation - if job_completion_df.empty: - job_completion_df = pd.DataFrame(columns=merged_df.columns) - elif merged_df.empty: - merged_df = pd.DataFrame(columns=job_completion_df.columns) - - job_completion_df = pd.concat([job_completion_df, merged_df], axis=0, ignore_index=True) - - # The job and task ids all need to be cleared out and replaced with the new ones - # from the job_completion_df, which is based on the latest results branches - updated_results_df = pd.merge( - previous_job_completion_df.drop(columns=['has_results']), - job_completion_df, - on=merge_on, - how='left', - suffixes=('', '_completion'), - ) - - # Update the job_id and task_id columns - for col in ['job_id', 'task_id']: - update_mask = updated_results_df[col + '_completion'].notna() - updated_results_df.loc[update_mask, col] = updated_results_df.loc[ - update_mask, col + '_completion' - ] - # For merged zip completion, job_id and task_id should be NA even if not in completion df - # This happens when has_results is True but job_id/task_id_completion are NA - merged_zip_mask = ( - updated_results_df['has_results'].fillna(False).infer_objects(copy=False).astype(bool) - & updated_results_df[col + '_completion'].isna() - ) - updated_results_df.loc[merged_zip_mask, col] = pd.NA - - # Fill NaN values with appropriate defaults - # Convert to Python boolean for compatibility with 'is True' checks in tests - # Use object dtype to store Python booleans instead of numpy booleans - has_results_filled = ( - updated_results_df['has_results'].fillna(False).infer_objects(copy=False).astype(bool) - ) - has_results_list = [bool(x) if pd.notna(x) else False for x in has_results_filled] - updated_results_df['has_results'] = pd.Series(has_results_list, dtype=object) - updated_results_df['submitted'] = ( - updated_results_df['submitted'].astype('boolean').fillna(False) - ) - updated_results_df['state'] = updated_results_df['state'].fillna('') - - # Now compute is_failed with NaN-safe operations - updated_results_df['is_failed'] = ( - updated_results_df['submitted'] - & ~updated_results_df['has_results'] - & ~updated_results_df['state'].isin(['PD', 'R']) - ) - - # Drop all completion columns (job_id, task_id, and any other overlapping - # columns like ses_id_completion when use_sesid was False in a prior run) - completion_suffix_columns = [ - col for col in updated_results_df.columns if col.endswith('_completion') - ] - updated_results_df = updated_results_df.drop(columns=completion_suffix_columns) - - return updated_results_df - - def update_submitted_job_ids(results_df, submitted_df): """Update the most recent job and task ids in the status df. @@ -687,78 +496,6 @@ def update_submitted_job_ids(results_df, submitted_df): return merged -def update_job_batch_status(status_df, job_submit_df): - """ - Update the status dataframe with the job submission information. - - Parameters: - ----------- - status_df: pd.DataFrame - status dataframe. Be sure has_results is up to date. - job_submit_df: pd.DataFrame - the current status of job submission: must have - - Returns: - -------- - pd.DataFrame - updated status dataframe - - """ - - if job_submit_df.empty: - return status_df - - if 'sub_id' not in job_submit_df: - raise ValueError('job_submit_df must have a sub_id column') - - use_sesid = 'ses_id' in status_df and 'ses_id' in job_submit_df - merge_on = ['sub_id', 'ses_id'] if use_sesid else ['sub_id'] - - # First merge to get the most recent results information - updated_status_df = pd.merge( - status_df, job_submit_df, on=merge_on, how='left', suffixes=('', '_batch') - ) - - # Updated which jobs have failed. If they have been submitted, do not have results, - # and are not currently running, they have failed. - currently_running = updated_status_df['state_batch'].isin(['PD', 'R']) - - for update_col in [ - 'job_id', - 'task_id', - 'state', - 'time_used', - 'time_limit', - 'nodes', - 'cpus', - 'partition', - 'name', - ]: - # Update job_id where update_mask is True - updated_status_df.loc[currently_running, update_col] = updated_status_df.loc[ - currently_running, f'{update_col}_batch' - ].astype(status_dtypes[update_col]) - - # Drop the batch columns - updated_status_df = updated_status_df.drop( - columns=[col for col in updated_status_df.columns if col.endswith('_batch')] - ) - - # replace NaN with False in has_results - updated_status_df['has_results'] = ( - updated_status_df['has_results'].astype('boolean').fillna(False) - ) - - # Update some of the dynamic columns - updated_status_df['is_failed'] = ( - updated_status_df['submitted'] - & ~updated_status_df['has_results'] - & ~updated_status_df['state'].isin(['PD', 'R']) - ) - - return updated_status_df - - def get_repo_hash(repo_path): """ Get the hash of the current commit of a git repository. diff --git a/design/status-without-pandas.md b/design/status-without-pandas.md new file mode 100644 index 00000000..4ac63b39 --- /dev/null +++ b/design/status-without-pandas.md @@ -0,0 +1,297 @@ +# babs status simplification + +## Problems + +1. **`is_failed` computed in 3 separate places** with inconsistent pandas + dtype handling. This has already caused a real bug: a completed job with + results gets counted as both succeeded AND failed (confirmed on cluster). + +2. **Pandas dtype footguns.** `has_results` was changed from `dtype='boolean'` + to `dtype=object` (Python bools) to fix a test assertion. Bitwise NOT on + object-dtype `True` gives `-2` (truthy) instead of `False`. NaN handling + requires `.fillna(False)` and `.astype('boolean')` scattered throughout. + +3. **Magic strings for scheduler state.** States like `'PD'`, `'R'`, `'CG'` + are bare strings compared via `.isin()` — no type safety, easy to typo. + +4. **Derived state stored in CSV.** `is_failed` is written to `job_status.csv` + then re-derived on every read, creating opportunities for staleness and + inconsistency. + +5. **Prior refactor attempt failed.** PR #302 (Matt) tried to replace pandas + with Python objects but broke logic and was never merged — it still used + pandas for I/O and stored `is_failed` instead of deriving it. + +## Goal + +Drop pandas from `babs status`. Replace DataFrames with dataclasses + dicts, +`csv` module instead of pandas I/O, enums instead of magic strings, and +derive `is_failed` in one place instead of three. + +## Related PRs (for context, not on this branch) + +- https://github.com/PennLINC/babs/pull/354 — `babs status --wait` + (branch: `mechababs-working-branch`). Has the dtype bug described above. +- https://github.com/PennLINC/babs/pull/302 — Matt's WIP (see problem #5). + +## Tien's input (2026-04-02) + +- Prior pd-removal attempt broke logic, was never merged +- Current babs status "isn't in a great state" +- Happy to review a simpler solution + +## Data flow + +### External sources + +1. **Git branches in output RIA** → `has_results` (via `_get_results_branches()`) +2. **Zip files in analysis dir** → `has_results` (via `_get_merged_results_from_analysis_dir()`) + - TODO: in practice finds nothing — `babs merge` pushes to output RIA, + zips don't end up in analysis dir. Confirmed on live cluster. +3. **squeue** → scheduler state (`PD`/`R`/`CG`/`CF`) (via `run_squeue()`) +4. **job_status.csv** → persisted state + - `submitted` — not derivable from external sources + - `has_results` — also persisted, branches deleted after `babs merge` + +### Processing flow (after this refactor) + +```mermaid +sequenceDiagram + participant CLI as babs_status()
interaction.py + participant Base as _update_results_status()
base.py + participant Status as status.py + participant CSV as job_status.csv + participant RIA as Output RIA + participant Slurm as SLURM squeue + + CLI->>Base: babs_status() + Base->>Status: read_job_status_csv() + Status->>CSV: read + CSV-->>Status: rows + Status-->>Base: dict[key, JobStatus] + + Base->>RIA: _get_results_branches() + RIA-->>Base: branch names + Base->>Status: update_from_branches(statuses, branches) + Status-->>Base: updated statuses + + Base->>Slurm: run_squeue() per job_id + Slurm-->>Base: raw squeue text + Base->>Status: update_from_scheduler(statuses, raw_squeue) + Status-->>Base: updated statuses + + Base->>Status: write_job_status_csv() + Status->>CSV: write + Base-->>CLI: statuses dict + CLI->>Status: report_job_status(statuses) + Note over CLI: renders jinja template +``` + +### Data out + +`report_job_status()` counts from the statuses dict and renders a jinja +template: total_jobs, total_submitted, total_is_done, total_pending, +total_running, total_failed. + +## Design + +### Data model + +```python +class SchedulerState(Enum): + NOT_SUBMITTED = "NOT_SUBMITTED" + PENDING = "PD" + RUNNING = "R" + COMPLETING = "CG" + CONFIGURING = "CF" + DONE = "DONE" # left scheduler, exit code unknown + # future: COMPLETED, FAILED, CANCELLED, TIMEOUT (from sacct) + +@dataclass +class JobStatus: + sub_id: str + ses_id: str | None + scheduler_state: SchedulerState + has_results: bool + job_id: int | None + task_id: int | None + time_used: str + time_limit: str + nodes: int + cpus: int + partition: str + name: str + + @property + def is_failed(self) -> bool: + return (self.scheduler_state == SchedulerState.DONE + and not self.has_results) + + @property + def submitted(self) -> bool: + return self.scheduler_state != SchedulerState.NOT_SUBMITTED + + @property + def key(self) -> tuple: + if self.ses_id is not None: + return (self.sub_id, self.ses_id) + return (self.sub_id,) +``` + +Scheduler fields (`job_id`, `task_id`, `time_used`, etc.) are kept because +the CSV is a useful artifact — job duration, partition, and node info help +with debugging even after jobs complete. + +### CSV as artifact, not source of truth + +- **Write**: include `is_failed`, `submitted`, and all scheduler fields for + human readability and debugging. +- **Read**: ignore `is_failed` and `submitted` columns; recompute from + `scheduler_state` + `has_results`. Self-heals if CSV is stale. +- The CSV persists two bits of state destroyed by later operations: + `submitted` (scheduler forgets completed jobs) and `has_results` + (branches deleted after `babs merge`). + +### Architecture + +All status logic lives in `babs/status.py`: + +``` +status.py +├── Types: SchedulerState, JobStatus +├── CSV I/O: read_job_status_csv(), write_job_status_csv() +├── Update: update_from_branches(), update_from_scheduler() +└── Init: create_initial_statuses() +``` + +Update functions are pure data in → data out: +- `update_from_branches(statuses, branches: list[str]) -> dict` + Parses branch names and joins with existing statuses in one step. + No intermediate BranchInfo type. +- `update_from_scheduler(statuses, raw_squeue: str) -> dict` + Parses raw squeue text and joins with existing statuses in one step. + No intermediate SchedulerEntry type. Parse + join together so we never + have unstructured data floating around without sub/ses attached. + +`scheduler.py` becomes thin: +- `run_squeue(queue, job_id=None) -> str` — runs command, returns raw stdout. + Format string and command flags stay here. Parsing moves to `status.py`. + +`base.py` orchestrates: +- `_update_results_status()` reads CSV, calls `update_from_branches`, + calls `update_from_scheduler`, writes CSV. Returns the dict so + `babs_status()` can pass it directly to `report_job_status()`. + +### User-visible changes + +1. **Bug fix**: ghost-job bug gone (completed job with results no longer + counted as failed when squeue is empty) +2. **`run_squeue` requires `job_id`** — previously defaulted to `None`, + which queries all user jobs. Made required to prevent accidental + full-queue queries (e.g. a future `--wait` polling loop would block + on unrelated jobs) and because non-array jobs from other projects + would crash the strict array format parser. + +CSV format is unchanged — same columns, same order, same values. +Internally `SchedulerState` enum distinguishes NOT_SUBMITTED from DONE, +but both serialize to empty `state` column (disambiguated by `submitted` +on read). No migration needed for existing projects. + +### Scope + +**What changed:** +- DataFrames → dataclasses + dicts +- Three `is_failed` computations → one property +- Magic strings → enums +- pandas CSV I/O → `csv` module +- squeue parsing moved from `scheduler.py` to `status.py` +- Update logic moved from `utils.py` to `status.py` + +**What did NOT change:** +- What the input streams check or how they check it +- The merge flow (`babs/merge.py`) +- The report template +- `job_submit.csv` (touched only where required at boundaries) +- `babs_submit()` still uses DataFrames internally + +### Principles + +- `is_failed` computed in exactly one place (the property) +- `submitted` derived from `scheduler_state != NOT_SUBMITTED` +- Enums instead of magic strings +- `csv` module instead of pandas for I/O +- No intermediate unstructured types — parse and join in one step +- CSV is artifact for humans, not source of truth +- `has_results` persisted in CSV (branches deleted after merge) +- Future-proof for sacct (replace DONE with real terminal states) + +## What was done + +### New file +- `babs/status.py` — data model, CSV I/O, update logic, initialization + +### Modified +- `babs/utils.py` — removed `update_results_status()`, + `update_job_batch_status()`, `results_branch_dataframe()`, + `results_status_default_values` +- `babs/base.py` — `_update_results_status()` is now a thin orchestrator + using `status.py`. Unused imports removed. +- `babs/interaction.py` — `babs_status()` uses returned dict directly +- `babs/scheduler.py` — added `run_squeue() -> str`. + `report_job_status()` accepts `dict[tuple, JobStatus]`. +- `babs/bootstrap.py` — `_create_initial_job_status_csv()` uses + `create_initial_statuses()` + `write_job_status_csv()` +- `babs/update.py` — `_update_job_status_with_new_inclusion()` uses + dict add/remove instead of DataFrame merge + +### Tests +- `tests/test_status.py` — new: model, CSV round-trip, update logic, + report counting. Pure data in → data out, no mocks. +- `tests/test_utils.py` — removed tests for deleted functions +- `tests/test_babs_workflow.py` — added CSV assertions at key points + in the e2e workflow + +## Remaining work + +- `babs_submit()` still uses DataFrames (`get_job_status_df`, + `get_currently_running_jobs_df`, `update_submitted_job_ids`) +- Full e2e suite on Slurm for confidence + +## Open questions + +### What is the value of `job_status.csv`? + +It's only updated by `babs status` (and `babs merge`), so it's never +guaranteed current. `babs status` re-verifies everything from external +sources every run. The CSV persists two bits of state destroyed by later +operations: `submitted` (scheduler forgets completed jobs) and `has_results` +(branches deleted after merge). Everything else is convenience artifact. +Is this the right contract, or should we rethink? + +### Worth splitting `has_unmerged_results` and `has_merged_results`? + +Currently `has_results` is a single bool. Before merge, evidence comes from +branches in output RIA. After merge, branches are deleted and the CSV is the +only record. Should we track these separately so we know *where* the results +are (still in branches vs. merged into main)? + +### Is `job_status.csv` a stable interface? + +Users and external scripts may read the CSV directly. If so, column names, +order, and value encoding are part of the interface and changes require +migration. If not, we're free to improve the format. + +Current decision: treat it as stable for now (no migration burden on +existing projects). But this limits our ability to clean up — e.g. the +`state` column stores SLURM-specific strings (`PD`, `R`) directly, +coupling the CSV to a specific scheduler. + +### Improved CSV format + +Currently we keep the old CSV format for backward compat (no migration). +The `state` column uses empty string for both NOT_SUBMITTED and DONE, +disambiguated by `submitted`. A cleaner format would use a +`scheduler_state` column with explicit values (`NOT_SUBMITTED`, `DONE`, +`PENDING`, `RUNNING`, etc.) and drop the redundant `submitted` and +`is_failed` columns. This would also decouple the CSV from SLURM-specific +state codes. Would require a migration path for existing projects. diff --git a/tests/test_babs_workflow.py b/tests/test_babs_workflow.py index de6a1ab9..674609e8 100644 --- a/tests/test_babs_workflow.py +++ b/tests/test_babs_workflow.py @@ -20,9 +20,11 @@ from babs import base as babs_base from babs.cli import _enter_check_setup, _enter_init, _enter_merge, _enter_status, _enter_submit from babs.scheduler import squeue_to_pandas +from babs.status import SchedulerState, read_job_status_csv from babs.utils import get_results_branches_from_clone +@pytest.mark.timeout(450) @pytest.mark.parametrize('processing_level', ['subject', 'session']) def test_babs_init_raw_bids( tmp_path_factory, @@ -106,6 +108,16 @@ def test_babs_init_raw_bids( with mock.patch.object(argparse.ArgumentParser, 'parse_args', return_value=babs_status_opts): _enter_status() + # Verify CSV: all jobs should be NOT_SUBMITTED with no results + babs_obj = babs_base.BABS(project_root) + statuses = read_job_status_csv(babs_obj.job_status_path_abs) + assert len(statuses) > 0, 'job_status.csv should have entries after init' + for job in statuses.values(): + assert job.scheduler_state == SchedulerState.NOT_SUBMITTED + assert not job.has_results + assert not job.submitted + assert not job.is_failed + ensure_container_image(project_root, container_name) # babs submit: @@ -142,6 +154,14 @@ def test_babs_init_raw_bids( with mock.patch.object(argparse.ArgumentParser, 'parse_args', return_value=babs_status_opts): _enter_status() + # Verify CSV: first batch should have results, rest should be DONE (failed) or NOT_SUBMITTED + statuses = read_job_status_csv(babs_obj.job_status_path_abs) + jobs_with_results = [j for j in statuses.values() if j.has_results] + assert len(jobs_with_results) >= 1, 'At least one job should have results after first batch' + for job in jobs_with_results: + assert not job.is_failed + assert job.submitted + # Submit the remaining job(s): babs_submit_opts = argparse.Namespace( project_root=project_root, diff --git a/tests/test_status.py b/tests/test_status.py new file mode 100644 index 00000000..607fef90 --- /dev/null +++ b/tests/test_status.py @@ -0,0 +1,515 @@ +"""Tests for babs.status — data model, CSV I/O, and update logic.""" + +import os + +import pytest + +from babs.scheduler import report_job_status +from babs.status import ( + JobStatus, + SchedulerState, + create_initial_statuses, + read_job_status_csv, + update_from_branches, + update_from_scheduler, + write_job_status_csv, +) + +# -- SchedulerState ----------------------------------------------------------- + + +class TestSchedulerState: + def test_from_slurm_state_known_states(self): + assert SchedulerState.from_slurm_state('PD') == SchedulerState.PENDING + assert SchedulerState.from_slurm_state('R') == SchedulerState.RUNNING + assert SchedulerState.from_slurm_state('CG') == SchedulerState.COMPLETING + assert SchedulerState.from_slurm_state('CF') == SchedulerState.CONFIGURING + + def test_from_slurm_state_unknown_raises(self): + with pytest.raises(ValueError, match='Unknown SLURM scheduler state'): + SchedulerState.from_slurm_state('BOGUS') + + +# -- JobStatus properties ----------------------------------------------------- + + +class TestJobStatusProperties: + def _make(self, state=SchedulerState.NOT_SUBMITTED, has_results=False, ses_id=None): + return JobStatus( + sub_id='sub-01', + ses_id=ses_id, + scheduler_state=state, + has_results=has_results, + job_id=None, + task_id=None, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ) + + def test_not_submitted(self): + job = self._make(SchedulerState.NOT_SUBMITTED) + assert not job.submitted + assert not job.is_failed + + def test_running_not_failed(self): + job = self._make(SchedulerState.RUNNING) + assert job.submitted + assert not job.is_failed + + def test_pending_not_failed(self): + job = self._make(SchedulerState.PENDING) + assert job.submitted + assert not job.is_failed + + def test_done_with_results_not_failed(self): + job = self._make(SchedulerState.DONE, has_results=True) + assert job.submitted + assert not job.is_failed + + def test_done_without_results_is_failed(self): + job = self._make(SchedulerState.DONE, has_results=False) + assert job.submitted + assert job.is_failed + + def test_key_subject_only(self): + job = self._make() + assert job.key == ('sub-01',) + + def test_key_with_session(self): + job = self._make(ses_id='ses-A') + assert job.key == ('sub-01', 'ses-A') + + +# -- CSV round-trip ------------------------------------------------------------ + + +class TestCSVRoundTrip: + def _sample_statuses(self): + return { + ('sub-01',): JobStatus( + sub_id='sub-01', + ses_id=None, + scheduler_state=SchedulerState.DONE, + has_results=True, + job_id=123, + task_id=1, + time_used='1:30:00', + time_limit='5-00:00:00', + nodes=1, + cpus=4, + partition='normal', + name='my_job', + ), + ('sub-02',): JobStatus( + sub_id='sub-02', + ses_id=None, + scheduler_state=SchedulerState.NOT_SUBMITTED, + has_results=False, + job_id=None, + task_id=None, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ), + } + + def test_round_trip_subject_level(self, tmp_path): + original = self._sample_statuses() + path = str(tmp_path / 'job_status.csv') + write_job_status_csv(path, original) + loaded = read_job_status_csv(path) + + assert set(loaded.keys()) == set(original.keys()) + for key in original: + assert loaded[key].sub_id == original[key].sub_id + assert loaded[key].scheduler_state == original[key].scheduler_state + assert loaded[key].has_results == original[key].has_results + assert loaded[key].job_id == original[key].job_id + assert loaded[key].is_failed == original[key].is_failed + assert loaded[key].submitted == original[key].submitted + + def test_round_trip_session_level(self, tmp_path): + original = { + ('sub-01', 'ses-A'): JobStatus( + sub_id='sub-01', + ses_id='ses-A', + scheduler_state=SchedulerState.RUNNING, + has_results=False, + job_id=456, + task_id=2, + time_used='0:10', + time_limit='2:00:00', + nodes=1, + cpus=2, + partition='gpu', + name='gpu_job', + ), + } + path = str(tmp_path / 'job_status.csv') + write_job_status_csv(path, original) + loaded = read_job_status_csv(path) + + job = loaded[('sub-01', 'ses-A')] + assert job.ses_id == 'ses-A' + assert job.scheduler_state == SchedulerState.RUNNING + assert job.submitted + + def test_derived_columns_written_but_ignored_on_read(self, tmp_path): + """is_failed and submitted are written to CSV but recomputed on read.""" + path = str(tmp_path / 'job_status.csv') + statuses = { + ('sub-01',): JobStatus( + sub_id='sub-01', + ses_id=None, + scheduler_state=SchedulerState.DONE, + has_results=False, + job_id=1, + task_id=1, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ), + } + write_job_status_csv(path, statuses) + + # Verify is_failed=True is written + with open(path) as f: + content = f.read() + assert 'True' in content # is_failed column + + # On read, is_failed is derived, not read from CSV + loaded = read_job_status_csv(path) + assert loaded[('sub-01',)].is_failed is True + + def test_empty_statuses_no_file(self, tmp_path): + path = str(tmp_path / 'job_status.csv') + write_job_status_csv(path, {}) + assert not os.path.exists(path) + + def test_empty_statuses_remove_existing_file(self, tmp_path): + path = str(tmp_path / 'job_status.csv') + original = self._sample_statuses() + write_job_status_csv(path, original) + assert os.path.exists(path) + + write_job_status_csv(path, {}) + assert not os.path.exists(path) + + +# -- update_from_branches ------------------------------------------------------ + + +class TestUpdateFromBranches: + def _initial_statuses(self): + return { + ('sub-01',): JobStatus( + sub_id='sub-01', + ses_id=None, + scheduler_state=SchedulerState.DONE, + has_results=False, + job_id=None, + task_id=None, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ), + ('sub-02',): JobStatus( + sub_id='sub-02', + ses_id=None, + scheduler_state=SchedulerState.NOT_SUBMITTED, + has_results=False, + job_id=None, + task_id=None, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ), + } + + def test_branch_sets_has_results(self): + statuses = self._initial_statuses() + updated = update_from_branches(statuses, ['job-100-1-sub-01']) + + assert updated[('sub-01',)].has_results is True + assert updated[('sub-01',)].job_id == 100 + assert updated[('sub-01',)].task_id == 1 + + def test_no_match_leaves_unchanged(self): + statuses = self._initial_statuses() + updated = update_from_branches(statuses, ['job-100-1-sub-99']) + + assert updated[('sub-01',)].has_results is False + assert updated[('sub-02',)].has_results is False + + def test_non_matching_branch_names_ignored(self): + statuses = self._initial_statuses() + updated = update_from_branches(statuses, ['main', 'not-a-job']) + + assert updated == statuses + + def test_session_level_branches(self): + statuses = { + ('sub-01', 'ses-A'): JobStatus( + sub_id='sub-01', + ses_id='ses-A', + scheduler_state=SchedulerState.DONE, + has_results=False, + job_id=None, + task_id=None, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ), + } + updated = update_from_branches(statuses, ['job-200-1-sub-01-ses-A']) + + assert updated[('sub-01', 'ses-A')].has_results is True + assert updated[('sub-01', 'ses-A')].job_id == 200 + + def test_already_has_results_preserved(self): + """A job that already has_results=True keeps it even with no matching branch.""" + statuses = { + ('sub-01',): JobStatus( + sub_id='sub-01', + ses_id=None, + scheduler_state=SchedulerState.DONE, + has_results=True, + job_id=50, + task_id=1, + time_used='1:00', + time_limit='5-00:00:00', + nodes=1, + cpus=1, + partition='normal', + name='old_job', + ), + } + updated = update_from_branches(statuses, []) + + assert updated[('sub-01',)].has_results is True + assert updated[('sub-01',)].job_id == 50 + + def test_empty_statuses_with_branches(self): + """Branches for unknown subjects are ignored.""" + updated = update_from_branches({}, ['job-100-1-sub-99']) + assert len(updated) == 0 + + +# -- update_from_scheduler ----------------------------------------------------- + + +class TestUpdateFromScheduler: + def _submitted_statuses(self): + return { + ('sub-01',): JobStatus( + sub_id='sub-01', + ses_id=None, + scheduler_state=SchedulerState.RUNNING, + has_results=False, + job_id=100, + task_id=1, + time_used='0:05', + time_limit='5-00:00:00', + nodes=1, + cpus=1, + partition='normal', + name='my_job', + ), + ('sub-02',): JobStatus( + sub_id='sub-02', + ses_id=None, + scheduler_state=SchedulerState.PENDING, + has_results=False, + job_id=100, + task_id=2, + time_used='0:00', + time_limit='5-00:00:00', + nodes=1, + cpus=1, + partition='normal', + name='my_job', + ), + } + + def test_still_running(self): + statuses = self._submitted_statuses() + raw = '100_1|R|0:10|5-00:00:00|1|1|normal|my_job\n' + updated = update_from_scheduler(statuses, raw) + + assert updated[('sub-01',)].scheduler_state == SchedulerState.RUNNING + assert updated[('sub-01',)].time_used == '0:10' + + def test_job_left_scheduler_no_results_is_failed(self): + """The core bug scenario: squeue is empty, job had no results -> DONE -> is_failed.""" + statuses = self._submitted_statuses() + raw = '' # nothing in queue + updated = update_from_scheduler(statuses, raw) + + assert updated[('sub-01',)].scheduler_state == SchedulerState.DONE + assert updated[('sub-01',)].is_failed is True + + def test_job_left_scheduler_with_results_not_failed(self): + """Job finished and has results — not failed.""" + statuses = self._submitted_statuses() + # Give sub-01 results first + statuses[('sub-01',)] = JobStatus( + sub_id='sub-01', + ses_id=None, + scheduler_state=SchedulerState.RUNNING, + has_results=True, + job_id=100, + task_id=1, + time_used='0:05', + time_limit='5-00:00:00', + nodes=1, + cpus=1, + partition='normal', + name='my_job', + ) + raw = '' # nothing in queue + updated = update_from_scheduler(statuses, raw) + + assert updated[('sub-01',)].scheduler_state == SchedulerState.DONE + assert updated[('sub-01',)].is_failed is False + + def test_not_submitted_stays_not_submitted(self): + statuses = { + ('sub-01',): JobStatus( + sub_id='sub-01', + ses_id=None, + scheduler_state=SchedulerState.NOT_SUBMITTED, + has_results=False, + job_id=None, + task_id=None, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ), + } + raw = '' + updated = update_from_scheduler(statuses, raw) + + assert updated[('sub-01',)].scheduler_state == SchedulerState.NOT_SUBMITTED + assert not updated[('sub-01',)].is_failed + + def test_partial_squeue_some_done_some_running(self): + statuses = self._submitted_statuses() + # Only sub-02 still in queue + raw = '100_2|PD|0:00|5-00:00:00|1|1|normal|my_job\n' + updated = update_from_scheduler(statuses, raw) + + assert updated[('sub-01',)].scheduler_state == SchedulerState.DONE + assert updated[('sub-01',)].is_failed is True + assert updated[('sub-02',)].scheduler_state == SchedulerState.PENDING + + def test_non_array_job_id_raises(self): + """squeue output without array format (no underscore) should raise.""" + statuses = self._submitted_statuses() + raw = '100|R|0:10|5-00:00:00|1|1|normal|my_job\n' + with pytest.raises(ValueError, match='Expected array job format'): + update_from_scheduler(statuses, raw) + + +# -- create_initial_statuses --------------------------------------------------- + + +class TestCreateInitialStatuses: + def test_subject_level(self): + entries = [{'sub_id': 'sub-01'}, {'sub_id': 'sub-02'}] + statuses = create_initial_statuses(entries) + + assert len(statuses) == 2 + assert statuses[('sub-01',)].scheduler_state == SchedulerState.NOT_SUBMITTED + assert not statuses[('sub-01',)].submitted + assert not statuses[('sub-01',)].has_results + assert not statuses[('sub-01',)].is_failed + + def test_session_level(self): + entries = [ + {'sub_id': 'sub-01', 'ses_id': 'ses-A'}, + {'sub_id': 'sub-01', 'ses_id': 'ses-B'}, + ] + statuses = create_initial_statuses(entries) + + assert len(statuses) == 2 + assert ('sub-01', 'ses-A') in statuses + assert ('sub-01', 'ses-B') in statuses + + +# -- report_job_status --------------------------------------------------------- + + +class TestReportJobStatus: + def _make(self, state=SchedulerState.NOT_SUBMITTED, has_results=False): + return JobStatus( + sub_id='sub-01', + ses_id=None, + scheduler_state=state, + has_results=has_results, + job_id=None, + task_id=None, + time_used='', + time_limit='', + nodes=0, + cpus=0, + partition='', + name='', + ) + + def test_all_not_submitted(self, capsys): + statuses = { + ('sub-01',): self._make(), + ('sub-02',): self._make(), + } + report_job_status(statuses, '/fake/analysis') + out = capsys.readouterr().out + assert '2 jobs' in out + assert '0 job(s) have been submitted' in out + + def test_mixed_states(self, capsys): + statuses = { + ('sub-01',): self._make(SchedulerState.DONE, has_results=True), + ('sub-02',): self._make(SchedulerState.RUNNING), + ('sub-03',): self._make(SchedulerState.PENDING), + ('sub-04',): self._make(SchedulerState.DONE, has_results=False), + ('sub-05',): self._make(), + } + report_job_status(statuses, '/fake/analysis') + out = capsys.readouterr().out + assert '5 jobs' in out + assert '4 job(s) have been submitted' in out + assert '1 job(s) successfully finished' in out + assert '1 job(s) are pending' in out + assert '1 job(s) are running' in out + assert '1 job(s) failed' in out + + def test_all_done(self, capsys): + statuses = { + ('sub-01',): self._make(SchedulerState.DONE, has_results=True), + ('sub-02',): self._make(SchedulerState.DONE, has_results=True), + } + report_job_status(statuses, '/fake/analysis') + out = capsys.readouterr().out + assert 'All jobs are completed' in out diff --git a/tests/test_utils.py b/tests/test_utils.py index 9a3314b8..a986c8b6 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -4,7 +4,6 @@ from pathlib import Path from unittest.mock import MagicMock, patch -import datalad.api as dlapi import pandas as pd import pytest import yaml @@ -15,7 +14,6 @@ get_git_show_ref_shasum, get_immediate_subdirectories, get_repo_hash, - get_results_branches, get_results_branches_from_clone, get_results_branches_from_ria, get_username, @@ -23,34 +21,11 @@ parse_select_arg, read_yaml, replace_placeholder_from_config, - results_branch_dataframe, - update_job_batch_status, - update_results_status, update_submitted_job_ids, validate_processing_level, ) -def datalad_dataset_with_branches(ds_path, branch_list): - """Create a DataLad dataset with branches from the provided list.""" - ds = dlapi.create(path=ds_path) - ds.save(message='Initial commit') - - for branch in branch_list: - subprocess.run(['git', 'checkout', '-b', branch], cwd=ds_path, capture_output=True) - (ds_path / f'{branch}.txt').write_text(f'Content for {branch}') - ds.save(message=f'Add content for {branch}') - subprocess.run(['git', 'checkout', 'main'], cwd=ds_path, capture_output=True) - - return ds_path - - -BRANCH_LISTS = [ - ['job-123-1-sub-01', 'job-123-2-sub-02', 'job-124-1-sub-03'], - ['job-125-1-sub-01-ses-01', 'job-125-2-sub-01-ses-02', 'job-126-1-sub-02-ses-01'], -] - - def test_get_username(): """Test that get_username returns the current username.""" # Get the expected username using Python's getpass module @@ -191,19 +166,6 @@ def test_git_show_ref_shasum(tmp_path): get_git_show_ref_shasum('nonexistent-branch', repo_path) -@pytest.mark.parametrize('branch_list', BRANCH_LISTS) -def test_results_branch_dataframe(tmp_path_factory, branch_list): - """Test that branch info is correctly extracted to dataframe.""" - ds_path = datalad_dataset_with_branches(tmp_path_factory.mktemp('test_df'), branch_list) - branch_list = get_results_branches(ds_path) - if not len(branch_list) == len(branch_list): - raise ValueError('branch_list should have the same length as the number of branches') - - df = results_branch_dataframe(branch_list, 'subject') - - assert df.shape[0] == len(branch_list) - - def test_get_results_branches_from_clone(tmp_path): """get_results_branches_from_clone returns job-* branches, skips HEAD and non-job refs.""" with patch('babs.utils.subprocess.run') as mock_run: @@ -279,64 +241,6 @@ def test_replace_placeholder_from_config(): assert replace_placeholder_from_config(42) == '42' -def test_update_results_status(): - """Test update_results_status function.""" - # One session has results in the results branch - has_results_df = pd.DataFrame( - { - 'sub_id': ['sub-0002', 'sub-0002'], - 'ses_id': ['ses-01', 'ses-02'], - 'job_id': [2, 1], - 'task_id': [1, 1], - 'has_results': [True, True], - } - ) - - # The previous status was checked before submitting the new jobs - previous_status_df = pd.DataFrame( - { - 'sub_id': ['sub-0001', 'sub-0001', 'sub-0002', 'sub-0002'], - 'ses_id': ['ses-01', 'ses-02', 'ses-01', 'ses-02'], - 'job_id': [-1, -1, -1, 1], - 'task_id': [-1, -1, -1, 1], - 'submitted': [False, False, False, True], - 'state': [pd.NA, pd.NA, pd.NA, 'R'], - 'time_used': [pd.NA, pd.NA, pd.NA, '10:00'], - 'time_limit': ['5-00:00:00', '5-00:00:00', '5-00:00:00', '5-00:00:00'], - 'nodes': [pd.NA, pd.NA, pd.NA, 1], - 'cpus': [pd.NA, pd.NA, pd.NA, 1], - 'partition': [pd.NA, pd.NA, pd.NA, 'normal'], - 'name': [pd.NA, pd.NA, pd.NA, 'first_run'], - 'has_results': [False, False, False, True], - # Fields for tracking: - 'needs_resubmit': [False, False, False, False], - 'is_failed': [pd.NA, pd.NA, pd.NA, False], - 'log_filename': [pd.NA, pd.NA, pd.NA, 'test_array_job.log'], - 'last_line_stdout_file': [pd.NA, pd.NA, pd.NA, 'SUCCESS'], - 'alert_message': [pd.NA, pd.NA, pd.NA, pd.NA], - } - ) - - updated_df = update_results_status(previous_status_df, has_results_df) - - # Check the shape of the returned dataframe - assert updated_df.shape[0] == previous_status_df.shape[0] - - # Check that job_id and task_id were updated for entries that have results - assert updated_df.loc[2, 'job_id'] == 2 - assert updated_df.loc[2, 'task_id'] == 1 - assert updated_df.loc[3, 'job_id'] == 1 - assert updated_df.loc[3, 'task_id'] == 1 - - # Check that has_results field was updated - assert updated_df.loc[2, 'has_results'] - assert updated_df.loc[3, 'has_results'] - - # Check that is_failed field was updated correctly - assert not updated_df.loc[2, 'is_failed'] - assert not updated_df.loc[3, 'is_failed'] - - def test_combine_inclusion_dataframes(): """Test combine_inclusion_dataframes function.""" # Create test DataFrames @@ -420,53 +324,6 @@ def test_running_jobs(): assert identified_running_df.shape[0] == 3 -def test_update_job_batch_status(): - job_submit_df = pd.DataFrame( - { - 'sub_id': ['sub-0001', 'sub-0001'], - 'ses_id': ['ses-01', 'ses-02'], - 'job_id': [3, 3], - 'task_id': [1, 2], - 'state': ['R', 'PD'], - 'time_used': ['0:30', '0:00'], - 'time_limit': ['5-00:00:00', '5-00:00:00'], - 'nodes': [1, 1], - 'cpus': [1, 1], - 'partition': ['normal', 'normal'], - 'name': ['third_run', 'third_run'], - } - ) - - # The previous status was checked before submitting the new jobs - status_df = pd.DataFrame( - { - 'sub_id': ['sub-0001', 'sub-0001', 'sub-0002', 'sub-0002'], - 'ses_id': ['ses-01', 'ses-02', 'ses-01', 'ses-02'], - 'job_id': [-1, -1, 2, 1], - 'task_id': [-1, -1, 1, 1], - 'submitted': [False, False, True, True], - 'state': [pd.NA, pd.NA, 'R', 'R'], - 'time_used': [pd.NA, pd.NA, pd.NA, '10:00'], - 'time_limit': ['5-00:00:00', '5-00:00:00', '5-00:00:00', '5-00:00:00'], - 'nodes': [pd.NA, pd.NA, 1, 1], - 'cpus': [pd.NA, pd.NA, 1, 1], - 'partition': [pd.NA, pd.NA, 'normal', 'normal'], - 'name': [pd.NA, pd.NA, 'second_run', 'first_run'], - 'has_results': [False, False, False, True], - # Fields for tracking: - 'needs_resubmit': [False, False, False, False], - 'is_failed': [pd.NA, pd.NA, pd.NA, False], - 'log_filename': [pd.NA, pd.NA, pd.NA, 'test_array_job.log'], - 'last_line_stdout_file': [pd.NA, pd.NA, pd.NA, 'SUCCESS'], - 'alert_message': [pd.NA, pd.NA, pd.NA, pd.NA], - } - ) - - new_status_df = update_job_batch_status(status_df, job_submit_df) - - assert new_status_df.shape[0] == status_df.shape[0] - - def test_parse_select_arg(): select_arg = ['sub-0001', 'sub-0002'] assert parse_select_arg(select_arg).equals(pd.DataFrame({'sub_id': select_arg})) @@ -595,55 +452,6 @@ def test_parse_select_nested(): pd.testing.assert_frame_equal(result, expected) -def test_results_merged_zip(): - """Test update_results_status with merged_zip_completion_df.""" - previous_df = pd.DataFrame( - { - 'sub_id': ['sub-0001', 'sub-0002'], - 'ses_id': ['ses-01', 'ses-01'], - 'job_id': [-1, -1], - 'task_id': [-1, -1], - 'submitted': [False, False], - 'has_results': [False, False], - 'state': ['', ''], - 'is_failed': [False, False], - } - ) - - job_completion_df = pd.DataFrame( - columns=['sub_id', 'ses_id', 'job_id', 'task_id', 'has_results'] - ) - - merged_zip_completion_df = pd.DataFrame({'sub_id': ['sub-0001'], 'ses_id': ['ses-01']}) - - result = update_results_status(previous_df, job_completion_df, merged_zip_completion_df) - - assert result.shape[0] == 2 - assert result.loc[0, 'has_results'] is True - assert pd.isna(result.loc[0, 'job_id']) - assert pd.isna(result.loc[0, 'task_id']) - - -def test_results_empty_previous(): - """Test update_results_status with empty previous dataframe.""" - previous_df = pd.DataFrame( - columns=['sub_id', 'ses_id', 'job_id', 'task_id', 'submitted', 'has_results', 'state'] - ) - - job_completion_df = pd.DataFrame( - { - 'sub_id': ['sub-0001'], - 'ses_id': ['ses-01'], - 'job_id': [1], - 'task_id': [1], - 'has_results': [True], - } - ) - - result = update_results_status(previous_df, job_completion_df) - assert result.shape[0] == 0 - - def test_validate_inclusion_df(): """Test validate_sub_ses_processing_inclusion with DataFrame input.""" from babs.utils import validate_sub_ses_processing_inclusion @@ -713,29 +521,6 @@ def test_identify_running_jobs_error(): identify_running_jobs(last_submitted_df, currently_running_df) -def test_branch_no_matches(): - """Test results_branch_dataframe with branches that don't match pattern.""" - branches = ['not-a-job-branch', 'also-not-matching', 'master'] - df = results_branch_dataframe(branches, 'subject') - - assert df.empty - assert list(df.columns) == ['job_id', 'task_id', 'sub_id', 'has_results'] - - -def test_branch_session_level(): - """Test results_branch_dataframe with session-level processing.""" - branches = [ - 'job-123-1-sub-0001-ses-01', - 'job-123-2-sub-0001-ses-02', - 'job-124-1-sub-0002-ses-01', - ] - df = results_branch_dataframe(branches, 'session') - - assert df.shape[0] == 3 - assert 'ses_id' in df.columns - assert all(df['has_results']) - - def test_submitted_multiple_jobs(): """Test update_submitted_job_ids with multiple job IDs.""" results_df = pd.DataFrame(