diff --git a/arc/scheduler.py b/arc/scheduler.py index 60bca0804c..50002c8157 100644 --- a/arc/scheduler.py +++ b/arc/scheduler.py @@ -622,55 +622,21 @@ def schedule_jobs(self): job = self.job_dict[label]['conf_opt'][i] if 'conf_opt' in job_name \ else self.job_dict[label]['conf_sp'][i] if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs): - # this is a completed conformer job successful_server_termination = self.end_job(job=job, label=label, job_name=job_name) if successful_server_termination: troubleshooting_conformer = self.parse_conformer(job=job, label=label, i=i) if 'conf_opt' in job_name and self.job_types['conf_sp'] and not troubleshooting_conformer: # Accumulate for deferred pipe batching of conf_sp. self._pending_pipe_conf_sp.setdefault(label, set()).add(i) - if troubleshooting_conformer: - break - # Just terminated a conformer job. - # Are there additional conformer jobs currently running for this species? - # Note: end_job already removed the current job from running_jobs, - # so we don't need to exclude job_name. - for spec_jobs in job_list: - if 'conf_opt' in spec_jobs or 'conf_sp' in spec_jobs: - break - else: - # All conformer jobs terminated. - # Check isomorphism and run opt on most stable conformer geometry. - logger.info(f'\nConformer jobs for {label} successfully terminated.\n') - if self.species_dict[label].is_ts: - self.determine_most_likely_ts_conformer(label) - else: - self.determine_most_stable_conformer(label, sp_flag=True if self.job_types['conf_sp'] else False) # also checks isomorphism - if self.species_dict[label].initial_xyz is not None: - # if initial_xyz is None, then we're probably troubleshooting conformers, don't opt - if not self.composite_method: - self.run_opt_job(label, fine=self.fine_only) - else: - self.run_composite_job(label) self.timer = False break if 'tsg' in job_name: job = self.job_dict[label]['tsg'][get_i_from_job_name(job_name)] if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs): - # This is a successfully completed tsg job. It may have resulted in several TSGuesses. self.end_job(job=job, label=label, job_name=job_name) if job.local_path_to_output_file.endswith('.yml') or job.local_path_to_output_file.endswith('.log'): for rxn in job.reactions: rxn.ts_species.process_completed_tsg_queue_jobs(path=job.local_path_to_output_file) - # Just terminated a tsg job. - # Are there additional tsg jobs currently running for this species? - for spec_jobs in job_list: - if 'tsg' in spec_jobs: - break - else: - # All tsg jobs terminated. Spawn confs. - logger.info(f'\nTS guess jobs for {label} successfully terminated.\n') - self.run_conformer_jobs(labels=[label]) self.timer = False break elif 'opt' in job_name and 'conf_opt' not in job_name: @@ -803,20 +769,12 @@ def schedule_jobs(self): self.timer = False break - if not len(job_list): - has_pending_pipe_work = ( - label in self._pending_pipe_sp - or label in self._pending_pipe_freq - or any(lbl == label for lbl, _ in self._pending_pipe_irc) - or label in self._pending_pipe_conf_sp - or any(label in {t.owner_key for t in p.tasks} - for p in self.active_pipes.values()) - ) - if not has_pending_pipe_work: - self.check_all_done(label) - if not self.running_jobs[label]: - # Delete the label only if it represents an empty entry. - del self.running_jobs[label] + for label in list(self.unique_species_labels): + if label in self.output and self.output[label]['convergence'] is False: + continue + self._check_conformer_stage_complete(label) + self._check_tsg_stage_complete(label) + self._check_species_complete(label) # Poll active pipe runs (per-run failures are handled inside poll_pipes). if self.active_pipes: @@ -840,6 +798,114 @@ def schedule_jobs(self): # Generate a TS report: self.generate_final_ts_guess_report() + def _check_conformer_stage_complete(self, label: str) -> None: + """ + Check whether all conformer jobs (conf_opt/conf_sp) for a species have + finished. If so, select the best conformer and spawn the next job. + + Called unconditionally after job event processing so that no break + in the job-processing loop can skip the conformer-to-opt transition. + """ + if 'conf_opt' not in self.job_dict.get(label, {}): + return + if any('conf_opt' in j or 'conf_sp' in j + for j in self.running_jobs.get(label, [])): + return + if label in self._pending_pipe_conf_sp: + return + if any(label in {t.owner_key for t in p.tasks} + for p in self.active_pipes.values() + if any(t.task_family in ('conf_opt', 'conf_sp', 'ts_opt') for t in p.tasks)): + return + if self.species_dict[label].initial_xyz is not None: + return + if self.output[label].get('job_types', {}).get('conf_opt'): + return + if self.species_dict[label].is_ts and self.species_dict[label].ts_guesses_exhausted: + return + + if self.species_dict[label].is_ts: + has_successful_conformer = any( + tsg.energy is not None for tsg in self.species_dict[label].ts_guesses) + else: + has_successful_conformer = any( + e is not None for e in self.species_dict[label].conformer_energies) + + if not has_successful_conformer: + logger.error(f'All conformer jobs for {label} failed. ' + f'No conformer has a valid energy.') + if self.species_dict[label].is_ts: + self.species_dict[label].ts_guesses_exhausted = True + return + + logger.info(f'\nConformer jobs for {label} successfully terminated.\n') + if self.species_dict[label].is_ts: + self.determine_most_likely_ts_conformer(label) + else: + self.determine_most_stable_conformer( + label, sp_flag=True if self.job_types.get('conf_sp') else False) + if self.species_dict[label].initial_xyz is not None: + if not self.composite_method: + self.run_opt_job(label, fine=self.fine_only) + else: + self.run_composite_job(label) + elif not any('conf_opt' in j or 'conf_sp' in j + for j in self.running_jobs.get(label, [])): + self.output[label]['job_types']['conf_opt'] = True + + def _check_tsg_stage_complete(self, label: str) -> None: + """ + Check whether all TS guess jobs for a species have finished. + If so, spawn conformer jobs for the TS. + """ + if 'tsg' not in self.job_dict.get(label, {}): + return + if any('tsg' in j for j in self.running_jobs.get(label, [])): + return + if not self.species_dict[label].is_ts: + return + if self.species_dict[label].ts_conf_spawned: + return + if self.species_dict[label].ts_guesses_exhausted: + return + if not all(tsg.success is not None for tsg in self.species_dict[label].ts_guesses): + return + + if not any(tsg.success for tsg in self.species_dict[label].ts_guesses): + logger.error(f'All TS guess jobs for {label} failed. ' + f'No successful TS guess found.') + self.species_dict[label].ts_guesses_exhausted = True + return + + logger.info(f'\nTS guess jobs for {label} successfully terminated.\n') + self.run_conformer_jobs(labels=[label]) + + def _check_species_complete(self, label: str) -> None: + """ + Check whether all jobs for a species are complete and call + check_all_done if so. Clean up empty running_jobs entries. + """ + if label in self.output and self.output[label]['convergence'] is not None: + # Species already finalized (converged or failed); clean up and skip. + if label in self.running_jobs and not self.running_jobs[label]: + del self.running_jobs[label] + return + running = self.running_jobs.get(label, []) + if running: + return + has_pending_pipe_work = ( + label in self._pending_pipe_sp + or label in self._pending_pipe_freq + or any(lbl == label for lbl, _ in self._pending_pipe_irc) + or label in self._pending_pipe_conf_sp + or any(label in {t.owner_key for t in p.tasks} + for p in self.active_pipes.values()) + ) + if not has_pending_pipe_work: + self.check_all_done(label) + if label in self.running_jobs and not self.running_jobs[label]: + del self.running_jobs[label] + def run_job(self, job_type: str, conformer: Optional[int] = None, diff --git a/arc/scheduler_test.py b/arc/scheduler_test.py index 3216a9f254..9e3e307e7b 100644 --- a/arc/scheduler_test.py +++ b/arc/scheduler_test.py @@ -8,6 +8,7 @@ import unittest import os import shutil +from unittest.mock import patch, MagicMock import arc.parser.parser as parser from arc.checks.ts import check_ts @@ -19,7 +20,7 @@ from arc.imports import settings from arc.reaction import ARCReaction from arc.species.converter import str_to_xyz -from arc.species.species import ARCSpecies +from arc.species.species import ARCSpecies, TSGuess default_levels_of_theory = settings['default_levels_of_theory'] @@ -757,6 +758,135 @@ def test_add_label_to_unique_species_labels(self): self.assertEqual(unique_label, 'new_species_15_1') self.assertEqual(self.sched2.unique_species_labels, ['methylamine', 'C2H6', 'CtripCO', 'new_species_15', 'new_species_15_0', 'new_species_15_1']) + def _make_isolated_scheduler(self): + """Create a Scheduler with a fresh species object for tests that mutate species state.""" + spc = ARCSpecies(label='spc_test', smiles='CN', + xyz=str_to_xyz("""C -0.57422867 -0.01669771 0.01229213 +N 0.82084044 0.08279104 -0.37769346 +H -1.05737005 -0.84067772 -0.52007494 +H -1.10211468 0.90879867 -0.23383011 +H -0.66133128 -0.19490562 1.08785111 +H 0.88047852 0.26966160 -1.37780789 +H 1.27889520 -0.81548721 -0.22940984""")) + sched = Scheduler( + project='project_test_stage_checks', + ess_settings=self.ess_settings, + species_list=[spc], + composite_method=None, + conformer_opt_level=Level(repr=default_levels_of_theory['conformer']), + opt_level=Level(repr=default_levels_of_theory['opt']), + freq_level=Level(repr=default_levels_of_theory['freq']), + sp_level=Level(repr=default_levels_of_theory['sp']), + scan_level=Level(repr=default_levels_of_theory['scan']), + ts_guess_level=Level(repr=default_levels_of_theory['ts_guesses']), + project_directory=os.path.join(ARC_PATH, 'Projects', 'arc_project_for_testing_delete_after_usage6'), + testing=True, + job_types=self.job_types1, + orbitals_level=default_levels_of_theory['orbitals'], + adaptive_levels=None, + ) + return sched, spc.label + + def test_check_conformer_stage_complete_spawns_opt_for_ts(self): + """Test that _check_conformer_stage_complete() calls determine_most_likely_ts_conformer() and + spawns an opt job after all TS conformer jobs finish, even when the job-processing loop broke + early due to troubleshooting.""" + sched, label = self._make_isolated_scheduler() + # Set up species as a TS with completed conformer jobs. + sched.species_dict[label].is_ts = True + sched.species_dict[label].ts_conf_spawned = True + sched.species_dict[label].ts_guesses_exhausted = False + sched.species_dict[label].initial_xyz = None + tsg = TSGuess(method='autotst', index=0, success=True, energy=10.0) + sched.species_dict[label].ts_guesses = [tsg] + sched.job_dict[label] = {'conf_opt': {0: MagicMock()}} + sched.running_jobs[label] = [] # all conf_opt jobs done + sched.output[label]['job_types']['conf_opt'] = False + + with patch.object(sched, 'determine_most_likely_ts_conformer') as mock_det, \ + patch.object(sched, 'run_opt_job') as mock_opt: + # Simulate determine_most_likely_ts_conformer setting initial_xyz. + def set_xyz(lbl): + sched.species_dict[lbl].initial_xyz = {'symbols': ('C',), 'isotopes': (12,), 'coords': ((0, 0, 0),)} + mock_det.side_effect = set_xyz + sched._check_conformer_stage_complete(label) + mock_det.assert_called_once_with(label) + mock_opt.assert_called_once_with(label, fine=sched.fine_only) + + def test_check_tsg_stage_complete_all_failed(self): + """Test that _check_tsg_stage_complete() sets ts_guesses_exhausted when all TS guesses + failed, and does not call run_conformer_jobs().""" + sched, label = self._make_isolated_scheduler() + sched.species_dict[label].is_ts = True + sched.species_dict[label].ts_conf_spawned = False + sched.species_dict[label].ts_guesses_exhausted = False + tsg1 = TSGuess(method='autotst', index=0, success=False) + tsg2 = TSGuess(method='gcn', index=1, success=False) + sched.species_dict[label].ts_guesses = [tsg1, tsg2] + sched.job_dict[label] = {'tsg': {0: MagicMock(), 1: MagicMock()}} + sched.running_jobs[label] = [] # no tsg jobs running + + with patch.object(sched, 'run_conformer_jobs') as mock_conf: + sched._check_tsg_stage_complete(label) + mock_conf.assert_not_called() + self.assertTrue(sched.species_dict[label].ts_guesses_exhausted) + + def test_check_tsg_stage_complete_no_repeat_after_exhausted(self): + """Test that _check_tsg_stage_complete() returns immediately when ts_guesses_exhausted + is already True (does not re-log or re-call run_conformer_jobs).""" + sched, label = self._make_isolated_scheduler() + sched.species_dict[label].is_ts = True + sched.species_dict[label].ts_conf_spawned = False + sched.species_dict[label].ts_guesses_exhausted = True + tsg = TSGuess(method='autotst', index=0, success=False) + sched.species_dict[label].ts_guesses = [tsg] + sched.job_dict[label] = {'tsg': {0: MagicMock()}} + sched.running_jobs[label] = [] + + with patch.object(sched, 'run_conformer_jobs') as mock_conf: + sched._check_tsg_stage_complete(label) + mock_conf.assert_not_called() + + def test_check_species_complete_no_repeat_after_converged(self): + """Test that _check_species_complete() does not call check_all_done() + for a species whose convergence is already True.""" + sched, label = self._make_isolated_scheduler() + sched.output[label]['convergence'] = True + sched.running_jobs[label] = [] # empty entry left over + + with patch.object(sched, 'check_all_done') as mock_cad: + sched._check_species_complete(label) + mock_cad.assert_not_called() + # Also verify empty running_jobs entry was cleaned up. + self.assertNotIn(label, sched.running_jobs) + + def test_check_species_complete_no_repeat_after_failed(self): + """Test that _check_species_complete() does not call check_all_done() + for a species whose convergence is already False.""" + sched, label = self._make_isolated_scheduler() + sched.output[label]['convergence'] = False + + with patch.object(sched, 'check_all_done') as mock_cad: + sched._check_species_complete(label) + mock_cad.assert_not_called() + + def test_check_species_complete_calls_check_all_done_when_ready(self): + """Test that _check_species_complete() calls check_all_done() when running_jobs + is empty and convergence is still None (not yet finalized).""" + sched, label = self._make_isolated_scheduler() + sched.output[label]['convergence'] = None + sched.running_jobs[label] = [] + sched._pending_pipe_sp = set() + sched._pending_pipe_freq = set() + sched._pending_pipe_irc = set() + sched._pending_pipe_conf_sp = {} + + with patch.object(sched, 'check_all_done') as mock_cad: + sched._check_species_complete(label) + mock_cad.assert_called_once_with(label) + # Empty running_jobs entry should be cleaned up. + self.assertNotIn(label, sched.running_jobs) + @classmethod def tearDownClass(cls): """