Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/trouble_shooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The resource dictionary parameter `resource_dict` can contain one or more of the
* `exclusive` (bool): boolean flag to reserve exclusive access to selected compute nodes - do not allow other tasks to
use the same compute node.
* `error_log_file` (str): path to the error log file, primarily used to merge the log of multiple tasks in one file.
* `run_time_limit` (int): the maximum time the execution of the submitted Python function is allowed to take in seconds.
* `run_time_max` (int): the maximum time the execution of the submitted Python function is allowed to take in seconds.
* `priority` (int): the queuing system priority assigned to a given Python function to influence the scheduling.
* `slurm_cmd_args` (list): Additional command line arguments for the srun call (SLURM only)

Expand Down
18 changes: 9 additions & 9 deletions src/executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class FluxJobExecutor(BaseExecutor):
do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
* slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Expand Down Expand Up @@ -154,7 +154,7 @@ def __init__(
- do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python
* run_time_max (int): the maximum time the execution of the submitted Python
function is allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
Expand Down Expand Up @@ -283,8 +283,8 @@ class FluxClusterExecutor(BaseExecutor):
do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
* slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Expand Down Expand Up @@ -380,7 +380,7 @@ def __init__(
- do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python
* run_time_max (int): the maximum time the execution of the submitted Python
function is allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
Expand Down Expand Up @@ -417,7 +417,7 @@ def __init__(
"cwd": None,
"openmpi_oversubscribe": openmpi_oversubscribe,
"slurm_cmd_args": [],
"run_time_limit": None,
"run_time_max": None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if there's any deprecation handling for run_time_limit elsewhere in the codebase
rg -n 'run_time_limit' --type py

# Check if there are migration notes or changelog entries
fd -e md -e rst -e txt | xargs rg -l 'run_time_limit\|run_time_max' 2>/dev/null || echo "No migration docs found"

Repository: pyiron/executorlib

Length of output: 85


🏁 Script executed:

# First, let's see the actual lines around 420 in flux.py
head -430 src/executorlib/executor/flux.py | tail -30

Repository: pyiron/executorlib

Length of output: 1849


🏁 Script executed:

# Check the validation functions referenced in the comment
rg -A 10 'def validate_resource_dict' --type py

Repository: pyiron/executorlib

Length of output: 1986


🏁 Script executed:

# Check if there are any tests that demonstrate the breaking behavior
fd -e py -path '*/test*' | xargs rg -l 'run_time' 2>/dev/null

Repository: pyiron/executorlib

Length of output: 833


🏁 Script executed:

# Look for any migration or deprecation related code
rg -i 'deprecat|migrat|compat' --type py | head -20

Repository: pyiron/executorlib

Length of output: 2848


🏁 Script executed:

# Check FluxJobExecutor's __init__ around lines 155-165 to see which validation it uses
sed -n '140,170p' src/executorlib/executor/flux.py

Repository: pyiron/executorlib

Length of output: 3374


🏁 Script executed:

# Find ResourceDictValidation schema definition
rg -B 3 'class ResourceDictValidation' --type py -A 15

Repository: pyiron/executorlib

Length of output: 1351


🏁 Script executed:

# Check if tests exist for run_time_limit behavior with both executors
cat tests/unit/executor/test_flux_job.py | head -200

Repository: pyiron/executorlib

Length of output: 6240


Breaking change: users passing run_time_limit will have it silently ignored.

The rename from run_time_limit to run_time_max is a breaking change. Both FluxJobExecutor and FluxClusterExecutor use validate_resource_dict_with_optional_keys, which issues a warning for unrecognized keys but silently filters them out. Existing users passing run_time_limit will see a warning and the key will be ignored—meaning the time limit won't be applied.

Consider adding a deprecation shim that detects run_time_limit, issues a deprecation warning, and maps it to run_time_max for a smoother migration path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/executorlib/executor/flux.py` at line 420, Add a deprecation shim that
detects the old key "run_time_limit" in resource dicts before validation and
maps it to the new "run_time_max" while emitting a deprecation warning: modify
the code path used by FluxJobExecutor and FluxClusterExecutor to check incoming
resource dicts (e.g., inside the place that calls
validate_resource_dict_with_optional_keys) for "run_time_limit", log a clear
deprecation warning, copy its value to "run_time_max" (if "run_time_max" not
already present) and remove "run_time_limit" so the existing
validate_resource_dict_with_optional_keys behavior continues to work without
silently dropping the limit.

}
if resource_dict is None:
resource_dict = {}
Expand Down Expand Up @@ -540,8 +540,8 @@ def create_flux_executor(
do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
* slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Expand Down
16 changes: 8 additions & 8 deletions src/executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class SingleNodeExecutor(BaseExecutor):
do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
* slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Expand Down Expand Up @@ -145,7 +145,7 @@ def __init__(
- do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python
* run_time_max (int): the maximum time the execution of the submitted Python
function is allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
Expand Down Expand Up @@ -259,8 +259,8 @@ class TestClusterExecutor(BaseExecutor):
do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
* slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Expand Down Expand Up @@ -350,7 +350,7 @@ def __init__(
- do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python
* run_time_max (int): the maximum time the execution of the submitted Python
function is allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
Expand Down Expand Up @@ -479,8 +479,8 @@ def create_single_node_executor(
do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
* slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Expand Down
16 changes: 8 additions & 8 deletions src/executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class SlurmClusterExecutor(BaseExecutor):
do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
* slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Expand Down Expand Up @@ -149,7 +149,7 @@ def __init__(
- do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python
* run_time_max (int): the maximum time the execution of the submitted Python
function is allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
Expand Down Expand Up @@ -293,8 +293,8 @@ class SlurmJobExecutor(BaseExecutor):
do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
* slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Expand Down Expand Up @@ -390,7 +390,7 @@ def __init__(
- do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python
* run_time_max (int): the maximum time the execution of the submitted Python
function is allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
Expand Down Expand Up @@ -518,8 +518,8 @@ def create_slurm_executor(
do not allow other tasks to use the same compute node.
* error_log_file (str): path to the error log file, primarily used to merge the log of
multiple tasks in one file.
* run_time_limit (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* run_time_max (int): the maximum time the execution of the submitted Python function is
allowed to take in seconds.
* priority (int): the queuing system priority assigned to a given Python function to
influence the scheduling.
* slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
Expand Down
8 changes: 4 additions & 4 deletions src/executorlib/standalone/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def generate_slurm_command(
openmpi_oversubscribe: bool = False,
slurm_cmd_args: Optional[list[str]] = None,
pmi_mode: Optional[str] = None,
run_time_limit: Optional[int] = None,
run_time_max: Optional[int] = None,
) -> list[str]:
"""
Generate the command list for the SLURM interface.
Expand All @@ -141,7 +141,7 @@ def generate_slurm_command(
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None
run_time_limit (int): The maximum runtime in seconds for each task. Default: None
run_time_max (int): The maximum runtime in seconds for each task. Default: None

Returns:
list[str]: The generated command list.
Expand All @@ -161,8 +161,8 @@ def generate_slurm_command(
command_prepend_lst += ["--exact"]
if openmpi_oversubscribe:
command_prepend_lst += ["--oversubscribe"]
if run_time_limit is not None:
command_prepend_lst += ["--time=" + str(run_time_limit // 60 + 1)]
if run_time_max is not None:
command_prepend_lst += ["--time=" + str(run_time_max // 60 + 1)]
if slurm_cmd_args is not None and len(slurm_cmd_args) > 0:
command_prepend_lst += slurm_cmd_args
return command_prepend_lst
2 changes: 1 addition & 1 deletion src/executorlib/standalone/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ResourceDictValidation(BaseModel):
num_nodes: Optional[int] = None
exclusive: Optional[bool] = None
error_log_file: Optional[str] = None
run_time_limit: Optional[int] = None
run_time_max: Optional[int] = None
priority: Optional[int] = None
slurm_cmd_args: Optional[list[str]] = None

Expand Down
2 changes: 1 addition & 1 deletion src/executorlib/task_scheduler/file/spawner_pysqa.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def execute_with_pysqa(
"command": " ".join(command),
"dependency_list": [str(qid) for qid in task_dependent_lst],
"working_directory": os.path.abspath(cwd),
"run_time_max": resource_dict.get("run_time_limit"),
"run_time_max": resource_dict.get("run_time_max"),
}
if "cwd" in resource_dict:
del resource_dict["cwd"]
Expand Down
10 changes: 5 additions & 5 deletions src/executorlib/task_scheduler/interactive/spawner_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class FluxPythonSpawner(BaseSpawner):
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
run_time_limit (int): The maximum runtime in seconds for each task. Default: None
run_time_max (int): The maximum runtime in seconds for each task. Default: None
"""

def __init__(
Expand All @@ -62,7 +62,7 @@ def __init__(
flux_executor: Optional[flux.job.FluxExecutor] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
run_time_limit: Optional[int] = None,
run_time_max: Optional[int] = None,
):
super().__init__(
cwd=cwd,
Expand All @@ -80,7 +80,7 @@ def __init__(
self._flux_log_files = flux_log_files
self._priority = priority
self._future = None
self._run_time_limit = run_time_limit
self._run_time_max = run_time_max

def bootup(
self,
Expand Down Expand Up @@ -131,8 +131,8 @@ def bootup(
if self._cwd is not None:
jobspec.cwd = self._cwd
os.makedirs(self._cwd, exist_ok=True)
if self._run_time_limit is not None:
jobspec.duration = self._run_time_limit
if self._run_time_max is not None:
jobspec.duration = self._run_time_max
file_prefix = "flux_" + str(self._worker_id)
if self._flux_log_files and self._cwd is not None:
jobspec.stderr = os.path.join(self._cwd, file_prefix + ".err")
Expand Down
Loading
Loading