Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 114 additions & 48 deletions arc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,55 +622,21 @@ def schedule_jobs(self):
job = self.job_dict[label]['conf_opt'][i] if 'conf_opt' in job_name \
else self.job_dict[label]['conf_sp'][i]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
# this is a completed conformer job
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
troubleshooting_conformer = self.parse_conformer(job=job, label=label, i=i)
if 'conf_opt' in job_name and self.job_types['conf_sp'] and not troubleshooting_conformer:
# Accumulate for deferred pipe batching of conf_sp.
self._pending_pipe_conf_sp.setdefault(label, set()).add(i)
if troubleshooting_conformer:
break
# Just terminated a conformer job.
# Are there additional conformer jobs currently running for this species?
# Note: end_job already removed the current job from running_jobs,
# so we don't need to exclude job_name.
for spec_jobs in job_list:
if 'conf_opt' in spec_jobs or 'conf_sp' in spec_jobs:
break
else:
# All conformer jobs terminated.
# Check isomorphism and run opt on most stable conformer geometry.
logger.info(f'\nConformer jobs for {label} successfully terminated.\n')
if self.species_dict[label].is_ts:
self.determine_most_likely_ts_conformer(label)
else:
self.determine_most_stable_conformer(label, sp_flag=True if self.job_types['conf_sp'] else False) # also checks isomorphism
if self.species_dict[label].initial_xyz is not None:
# if initial_xyz is None, then we're probably troubleshooting conformers, don't opt
if not self.composite_method:
self.run_opt_job(label, fine=self.fine_only)
else:
self.run_composite_job(label)
self.timer = False
break
if 'tsg' in job_name:
job = self.job_dict[label]['tsg'][get_i_from_job_name(job_name)]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
# This is a successfully completed tsg job. It may have resulted in several TSGuesses.
self.end_job(job=job, label=label, job_name=job_name)
if job.local_path_to_output_file.endswith('.yml') or job.local_path_to_output_file.endswith('.log'):
for rxn in job.reactions:
rxn.ts_species.process_completed_tsg_queue_jobs(path=job.local_path_to_output_file)
# Just terminated a tsg job.
# Are there additional tsg jobs currently running for this species?
for spec_jobs in job_list:
if 'tsg' in spec_jobs:
break
else:
# All tsg jobs terminated. Spawn confs.
logger.info(f'\nTS guess jobs for {label} successfully terminated.\n')
self.run_conformer_jobs(labels=[label])
self.timer = False
break
elif 'opt' in job_name and 'conf_opt' not in job_name:
Expand Down Expand Up @@ -803,20 +769,12 @@ def schedule_jobs(self):
self.timer = False
break

if not len(job_list):
has_pending_pipe_work = (
label in self._pending_pipe_sp
or label in self._pending_pipe_freq
or any(lbl == label for lbl, _ in self._pending_pipe_irc)
or label in self._pending_pipe_conf_sp
or any(label in {t.owner_key for t in p.tasks}
for p in self.active_pipes.values())
)
if not has_pending_pipe_work:
self.check_all_done(label)
if not self.running_jobs[label]:
# Delete the label only if it represents an empty entry.
del self.running_jobs[label]
for label in list(self.unique_species_labels):
if label in self.output and self.output[label]['convergence'] is False:
continue
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This skips failed species but still runs all three checks for already-converged species. Maybe instead, continue if convergence is not None?

self._check_conformer_stage_complete(label)
self._check_tsg_stage_complete(label)
self._check_species_complete(label)
Comment thread
calvinp0 marked this conversation as resolved.

# Poll active pipe runs (per-run failures are handled inside poll_pipes).
if self.active_pipes:
Expand All @@ -840,6 +798,114 @@ def schedule_jobs(self):
# Generate a TS report:
self.generate_final_ts_guess_report()

def _check_conformer_stage_complete(self, label: str) -> None:
"""
Check whether all conformer jobs (conf_opt/conf_sp) for a species have
finished. If so, select the best conformer and spawn the next job.

Called unconditionally after job event processing so that no break
in the job-processing loop can skip the conformer-to-opt transition.
"""
if 'conf_opt' not in self.job_dict.get(label, {}):
return
if any('conf_opt' in j or 'conf_sp' in j
for j in self.running_jobs.get(label, [])):
return
if label in self._pending_pipe_conf_sp:
return
if any(label in {t.owner_key for t in p.tasks}
for p in self.active_pipes.values()
if any(t.task_family in ('conf_opt', 'conf_sp', 'ts_opt') for t in p.tasks)):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checks ts_opt , but the function is about conformers. If a species has active ts_opt jobs but its conformers are all done, this guard would incorrectly block the conformer stage from completing. Restrict to ('conf_opt', 'conf_sp') only?

return
if self.species_dict[label].initial_xyz is not None:
return
if self.output[label].get('job_types', {}).get('conf_opt'):
return
if self.species_dict[label].is_ts and self.species_dict[label].ts_guesses_exhausted:
return

if self.species_dict[label].is_ts:
has_successful_conformer = any(
tsg.energy is not None for tsg in self.species_dict[label].ts_guesses)
else:
has_successful_conformer = any(
e is not None for e in self.species_dict[label].conformer_energies)

if not has_successful_conformer:
logger.error(f'All conformer jobs for {label} failed. '
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also mark a failed conformer state in the species or .output in scheduler? (I don't remember but we should have a f;lag for this that should be turned on here)

f'No conformer has a valid energy.')
if self.species_dict[label].is_ts:
self.species_dict[label].ts_guesses_exhausted = True
return

logger.info(f'\nConformer jobs for {label} successfully terminated.\n')
if self.species_dict[label].is_ts:
self.determine_most_likely_ts_conformer(label)
else:
self.determine_most_stable_conformer(
label, sp_flag=True if self.job_types.get('conf_sp') else False)
if self.species_dict[label].initial_xyz is not None:
if not self.composite_method:
self.run_opt_job(label, fine=self.fine_only)
else:
self.run_composite_job(label)
elif not any('conf_opt' in j or 'conf_sp' in j
for j in self.running_jobs.get(label, [])):
self.output[label]['job_types']['conf_opt'] = True

def _check_tsg_stage_complete(self, label: str) -> None:
"""
Check whether all TS guess jobs for a species have finished.
If so, spawn conformer jobs for the TS.
"""
if 'tsg' not in self.job_dict.get(label, {}):
return
if any('tsg' in j for j in self.running_jobs.get(label, [])):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If TSG jobs are submitted through pipe mode, the method only checks self.running_jobs for 'tsg' entries but doesn't check self.active_pipes

return
if not self.species_dict[label].is_ts:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intuitively, I'd start with this check (will return from this func more often than the other checks)

return
if self.species_dict[label].ts_conf_spawned:
return
if self.species_dict[label].ts_guesses_exhausted:
return
if not all(tsg.success is not None for tsg in self.species_dict[label].ts_guesses):
return

if not any(tsg.success for tsg in self.species_dict[label].ts_guesses):
logger.error(f'All TS guess jobs for {label} failed. '
f'No successful TS guess found.')
self.species_dict[label].ts_guesses_exhausted = True
return

logger.info(f'\nTS guess jobs for {label} successfully terminated.\n')
self.run_conformer_jobs(labels=[label])

Comment thread
calvinp0 marked this conversation as resolved.
def _check_species_complete(self, label: str) -> None:
"""
Check whether all jobs for a species are complete and call
check_all_done if so. Clean up empty running_jobs entries.
"""
if label in self.output and self.output[label]['convergence'] is not None:
# Species already finalized (converged or failed); clean up and skip.
if label in self.running_jobs and not self.running_jobs[label]:
del self.running_jobs[label]
return
running = self.running_jobs.get(label, [])
if running:
return
has_pending_pipe_work = (
label in self._pending_pipe_sp
or label in self._pending_pipe_freq
or any(lbl == label for lbl, _ in self._pending_pipe_irc)
or label in self._pending_pipe_conf_sp
or any(label in {t.owner_key for t in p.tasks}
for p in self.active_pipes.values())
)
if not has_pending_pipe_work:
self.check_all_done(label)
if label in self.running_jobs and not self.running_jobs[label]:
del self.running_jobs[label]
Comment thread
calvinp0 marked this conversation as resolved.

def run_job(self,
job_type: str,
conformer: Optional[int] = None,
Expand Down
132 changes: 131 additions & 1 deletion arc/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import unittest
import os
import shutil
from unittest.mock import patch, MagicMock

import arc.parser.parser as parser
from arc.checks.ts import check_ts
Expand All @@ -19,7 +20,7 @@
from arc.imports import settings
from arc.reaction import ARCReaction
from arc.species.converter import str_to_xyz
from arc.species.species import ARCSpecies
from arc.species.species import ARCSpecies, TSGuess


default_levels_of_theory = settings['default_levels_of_theory']
Expand Down Expand Up @@ -757,6 +758,135 @@ def test_add_label_to_unique_species_labels(self):
self.assertEqual(unique_label, 'new_species_15_1')
self.assertEqual(self.sched2.unique_species_labels, ['methylamine', 'C2H6', 'CtripCO', 'new_species_15', 'new_species_15_0', 'new_species_15_1'])

def _make_isolated_scheduler(self):
"""Create a Scheduler with a fresh species object for tests that mutate species state."""
spc = ARCSpecies(label='spc_test', smiles='CN',
xyz=str_to_xyz("""C -0.57422867 -0.01669771 0.01229213
N 0.82084044 0.08279104 -0.37769346
H -1.05737005 -0.84067772 -0.52007494
H -1.10211468 0.90879867 -0.23383011
H -0.66133128 -0.19490562 1.08785111
H 0.88047852 0.26966160 -1.37780789
H 1.27889520 -0.81548721 -0.22940984"""))
sched = Scheduler(
project='project_test_stage_checks',
ess_settings=self.ess_settings,
species_list=[spc],
composite_method=None,
conformer_opt_level=Level(repr=default_levels_of_theory['conformer']),
opt_level=Level(repr=default_levels_of_theory['opt']),
freq_level=Level(repr=default_levels_of_theory['freq']),
sp_level=Level(repr=default_levels_of_theory['sp']),
scan_level=Level(repr=default_levels_of_theory['scan']),
ts_guess_level=Level(repr=default_levels_of_theory['ts_guesses']),
project_directory=os.path.join(ARC_PATH, 'Projects', 'arc_project_for_testing_delete_after_usage6'),
testing=True,
job_types=self.job_types1,
orbitals_level=default_levels_of_theory['orbitals'],
adaptive_levels=None,
)
return sched, spc.label

def test_check_conformer_stage_complete_spawns_opt_for_ts(self):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test covers the TS path. Add a test for non-TS species?

"""Test that _check_conformer_stage_complete() calls determine_most_likely_ts_conformer() and
spawns an opt job after all TS conformer jobs finish, even when the job-processing loop broke
early due to troubleshooting."""
sched, label = self._make_isolated_scheduler()
# Set up species as a TS with completed conformer jobs.
sched.species_dict[label].is_ts = True
sched.species_dict[label].ts_conf_spawned = True
sched.species_dict[label].ts_guesses_exhausted = False
sched.species_dict[label].initial_xyz = None
tsg = TSGuess(method='autotst', index=0, success=True, energy=10.0)
sched.species_dict[label].ts_guesses = [tsg]
sched.job_dict[label] = {'conf_opt': {0: MagicMock()}}
sched.running_jobs[label] = [] # all conf_opt jobs done
sched.output[label]['job_types']['conf_opt'] = False

with patch.object(sched, 'determine_most_likely_ts_conformer') as mock_det, \
patch.object(sched, 'run_opt_job') as mock_opt:
# Simulate determine_most_likely_ts_conformer setting initial_xyz.
def set_xyz(lbl):
sched.species_dict[lbl].initial_xyz = {'symbols': ('C',), 'isotopes': (12,), 'coords': ((0, 0, 0),)}
mock_det.side_effect = set_xyz
sched._check_conformer_stage_complete(label)
mock_det.assert_called_once_with(label)
mock_opt.assert_called_once_with(label, fine=sched.fine_only)

def test_check_tsg_stage_complete_all_failed(self):
"""Test that _check_tsg_stage_complete() sets ts_guesses_exhausted when all TS guesses
failed, and does not call run_conformer_jobs()."""
sched, label = self._make_isolated_scheduler()
sched.species_dict[label].is_ts = True
sched.species_dict[label].ts_conf_spawned = False
sched.species_dict[label].ts_guesses_exhausted = False
tsg1 = TSGuess(method='autotst', index=0, success=False)
tsg2 = TSGuess(method='gcn', index=1, success=False)
sched.species_dict[label].ts_guesses = [tsg1, tsg2]
sched.job_dict[label] = {'tsg': {0: MagicMock(), 1: MagicMock()}}
sched.running_jobs[label] = [] # no tsg jobs running

with patch.object(sched, 'run_conformer_jobs') as mock_conf:
sched._check_tsg_stage_complete(label)
mock_conf.assert_not_called()
self.assertTrue(sched.species_dict[label].ts_guesses_exhausted)

def test_check_tsg_stage_complete_no_repeat_after_exhausted(self):
"""Test that _check_tsg_stage_complete() returns immediately when ts_guesses_exhausted
is already True (does not re-log or re-call run_conformer_jobs)."""
sched, label = self._make_isolated_scheduler()
sched.species_dict[label].is_ts = True
sched.species_dict[label].ts_conf_spawned = False
sched.species_dict[label].ts_guesses_exhausted = True
tsg = TSGuess(method='autotst', index=0, success=False)
sched.species_dict[label].ts_guesses = [tsg]
sched.job_dict[label] = {'tsg': {0: MagicMock()}}
sched.running_jobs[label] = []

with patch.object(sched, 'run_conformer_jobs') as mock_conf:
sched._check_tsg_stage_complete(label)
mock_conf.assert_not_called()

def test_check_species_complete_no_repeat_after_converged(self):
"""Test that _check_species_complete() does not call check_all_done()
for a species whose convergence is already True."""
sched, label = self._make_isolated_scheduler()
sched.output[label]['convergence'] = True
sched.running_jobs[label] = [] # empty entry left over

with patch.object(sched, 'check_all_done') as mock_cad:
sched._check_species_complete(label)
mock_cad.assert_not_called()
# Also verify empty running_jobs entry was cleaned up.
self.assertNotIn(label, sched.running_jobs)

def test_check_species_complete_no_repeat_after_failed(self):
"""Test that _check_species_complete() does not call check_all_done()
for a species whose convergence is already False."""
sched, label = self._make_isolated_scheduler()
sched.output[label]['convergence'] = False

with patch.object(sched, 'check_all_done') as mock_cad:
sched._check_species_complete(label)
mock_cad.assert_not_called()

def test_check_species_complete_calls_check_all_done_when_ready(self):
"""Test that _check_species_complete() calls check_all_done() when running_jobs
is empty and convergence is still None (not yet finalized)."""
sched, label = self._make_isolated_scheduler()
sched.output[label]['convergence'] = None
sched.running_jobs[label] = []
sched._pending_pipe_sp = set()
sched._pending_pipe_freq = set()
sched._pending_pipe_irc = set()
sched._pending_pipe_conf_sp = {}

with patch.object(sched, 'check_all_done') as mock_cad:
sched._check_species_complete(label)
mock_cad.assert_called_once_with(label)
# Empty running_jobs entry should be cleaned up.
self.assertNotIn(label, sched.running_jobs)

@classmethod
def tearDownClass(cls):
"""
Expand Down
Loading