Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion babs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,18 @@ def _parse_status():
default=Path.cwd(),
type=PathExists,
)
parser.add_argument(
'--wait',
action='store_true',
default=False,
help='Poll until all submitted jobs complete or fail.',
)
parser.add_argument(
'--wait-interval',
type=int,
default=300,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 5 minutes too long for default?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate --wait-interval in CLI (>0) with a clear argparse error.

help='Seconds between status checks when using --wait.',
)

return parser

Expand All @@ -486,6 +498,8 @@ def _enter_status(argv=None):

def babs_status_main(
project_root: str,
wait: bool = False,
wait_interval: int = 300,
):
"""
This is the core function of `babs status`.
Expand All @@ -494,11 +508,18 @@ def babs_status_main(
----------
project_root: str
absolute path to the directory of BABS project
wait: bool
whether to poll until all submitted jobs complete or fail
wait_interval: int
seconds between status checks when using --wait
"""
from babs import BABSInteraction

babs_proj = BABSInteraction(project_root)
babs_proj.babs_status()
if wait:
babs_proj.babs_status_wait(interval=wait_interval)
else:
babs_proj.babs_status()


def _parse_merge():
Expand Down
42 changes: 42 additions & 0 deletions babs/interaction.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""This is the main module."""

import sys
import time

import numpy as np

from babs.base import BABS
Expand Down Expand Up @@ -149,3 +152,42 @@ def babs_status(self):
self.ensure_shared_group_runtime_ready()
statuses = self._update_results_status()
report_job_status(statuses, self.analysis_path)

def babs_status_wait(self, interval=300):
"""Poll job status until all submitted jobs complete or fail.

Exits 0 if nothing has been submitted or all submitted jobs
succeeded; exits 1 only if a submitted job failed; exits 130
on Ctrl-C.

Parameters
----------
interval: int
Seconds between status checks.
"""
try:
while True:
statuses = self._update_results_status()
report_job_status(statuses, self.analysis_path)
sys.stdout.flush()

submitted = [j for j in statuses.values() if j.submitted]
if not submitted:
print('No jobs have been submitted; nothing to wait on.')
return

done = all(j.has_results or j.is_failed for j in submitted)
if done:
n_results = sum(1 for j in submitted if j.has_results)
n_failed = sum(1 for j in submitted if j.is_failed)
print(
f'\nAll submitted jobs finished: {n_results} succeeded, {n_failed} failed.'
)
if n_failed > 0:
sys.exit(1)
return

time.sleep(interval)
except KeyboardInterrupt:
print('\nInterrupted by user.')
sys.exit(130)
1 change: 0 additions & 1 deletion docker/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ dependencies:
- pytest
- pytest-cov==5.0.0
- pytest-env==1.1.3
- pytest-timeout>=2.2.0
- pytest-xdist
- python=3.11
- shellcheck
1 change: 0 additions & 1 deletion environment_hpc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,4 @@ dependencies:
- qstat>=0.0.5
- pytest-cov>=5.0.0
- pytest-env>=1.1.0
- pytest-timeout>=2.2.0
- pytest-xdist
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dev = ["ruff ~= 0.4.3", "pre-commit"]
tests = [
"coverage",
"pytest",
"pytest-timeout>=2.2.0",
"pytest-xdist", # for running pytest in parallel
"pytest-cov", # for ordering test execution
"datalad-osf",
Expand Down
80 changes: 35 additions & 45 deletions tests/e2e-slurm/container/walkthrough-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,44 +73,26 @@ echo "Job submitted: Check setup, with job"

babs submit

# # Wait for all running jobs to finish
while [[ -n $(squeue -u "$USER" -t RUNNING,PENDING --noheader) ]]; do
echo "squeue -u \"$USER\" -t RUNNING,PENDING"
squeue -u "$USER" -t RUNNING,PENDING
echo "Waiting for running jobs to finish..."
sleep 5 # Wait for 60 seconds before checking again
done

echo "========================================================================="
echo "babs status:"
babs status
echo "========================================================================="

# Check for failed jobs TODO see above
# if sacct -u $USER --state=FAILED --noheader | grep -q "FAILED"; then
sacct -u "$USER"
if sacct -u "$USER" --noheader | grep -q "FAILED"; then
echo "========================================================================="
echo "There are failed jobs."
LOGS_DIR="analysis/logs"
if [ -d "$LOGS_DIR" ]; then
echo "========================================================================="
echo "Failed job / task logs from $LOGS_DIR:"
for f in "$LOGS_DIR"/*; do
if [ -f "$f" ]; then
echo "---------- $f ----------"
cat "$f"
echo ""
fi
done
fi
exit 1 # Exit with failure status
else
echo "========================================================================="
echo "PASSED: No failed jobs."
fi
babs status --wait --wait-interval 5
echo "PASSED: No failed jobs."

babs merge

echo "Checking job_status.csv after merge..."
cat analysis/code/job_status.csv
python -c "
import csv, sys
with open('analysis/code/job_status.csv') as f:
for row in csv.DictReader(f):
if row['submitted'].strip().lower() == 'true':
if row['has_results'].strip().lower() != 'true':
print(f'FAIL: {row[\"sub_id\"]} submitted but has_results={row[\"has_results\"]}')
sys.exit(1)
if row['is_failed'].strip().lower() == 'true':
print(f'FAIL: {row[\"sub_id\"]} has_results=True but is_failed=True')
sys.exit(1)
print('PASSED: job_status.csv is consistent')
"
echo "PASSED: e2e walkthrough successful!"

popd
Expand All @@ -134,14 +116,22 @@ pushd "${PWD}/${TEST2_NAME}"
babs check-setup

babs submit
# # Wait for all running jobs to finish
while [[ -n $(squeue -u "$USER" -t RUNNING,PENDING --noheader) ]]; do
echo "squeue -u \"$USER\" -t RUNNING,PENDING"
squeue -u "$USER" -t RUNNING,PENDING
echo "Waiting for running jobs to finish..."
sleep 5 # Wait for 60 seconds before checking again
done

babs status
babs status --wait --wait-interval 5

babs merge

echo "Checking job_status.csv after merge (multiinput)..."
cat analysis/code/job_status.csv
python -c "
import csv, sys
with open('analysis/code/job_status.csv') as f:
for row in csv.DictReader(f):
if row['submitted'].strip().lower() == 'true':
if row['has_results'].strip().lower() != 'true':
print(f'FAIL: {row[\"sub_id\"]} submitted but has_results={row[\"has_results\"]}')
sys.exit(1)
if row['is_failed'].strip().lower() == 'true':
print(f'FAIL: {row[\"sub_id\"]} has_results=True but is_failed=True')
sys.exit(1)
print('PASSED: job_status.csv is consistent (multiinput)')
"
Loading
Loading