diff --git a/pyomo/contrib/solver/plugins.py b/pyomo/contrib/solver/plugins.py index c16c369abd3..1fa1f7faf05 100644 --- a/pyomo/contrib/solver/plugins.py +++ b/pyomo/contrib/solver/plugins.py @@ -38,7 +38,7 @@ def load(): doc='Direct interface to Gurobi accommodating general MINLP', )(GurobiDirectMINLP) SolverFactory.register( - name="highs", legacy_name="highs", doc="Persistent interface to HiGHS" + name="highs_persistent", legacy_name="highs_persistent_v2", doc="Persistent interface to HiGHS" )(Highs) SolverFactory.register(name='gams', legacy_name='gams_v2', doc='Interface to GAMS')( GAMS diff --git a/pyomo/contrib/solver/tests/solvers/test_solvers.py b/pyomo/contrib/solver/tests/solvers/test_solvers.py index 77c776780ff..b746f91870a 100644 --- a/pyomo/contrib/solver/tests/solvers/test_solvers.py +++ b/pyomo/contrib/solver/tests/solvers/test_solvers.py @@ -73,7 +73,7 @@ def param_as_standalone_func(cls, p, func, name): ('gurobi_direct', GurobiDirect), ('gurobi_direct_minlp', GurobiDirectMINLP), ('ipopt', Ipopt), - ('highs', Highs), + ('highs_persistent', Highs), ('gams', GAMS), ('knitro_direct', KnitroDirectSolver), ] @@ -81,7 +81,7 @@ def param_as_standalone_func(cls, p, func, name): ('gurobi_persistent', GurobiPersistent), ('gurobi_direct', GurobiDirect), ('gurobi_direct_minlp', GurobiDirectMINLP), - ('highs', Highs), + ('highs_persistent', Highs), ('knitro_direct', KnitroDirectSolver), ] nlp_solvers = [ @@ -95,7 +95,7 @@ def param_as_standalone_func(cls, p, func, name): ('ipopt', Ipopt), ('knitro_direct', KnitroDirectSolver), ] -qp_solvers = qcp_solvers + [("highs", Highs)] +qp_solvers = qcp_solvers + [("highs_persistent", Highs)] miqcqp_solvers = [ ('gurobi_direct_minlp', GurobiDirectMINLP), ('gurobi_persistent', GurobiPersistent), @@ -2281,7 +2281,7 @@ def test_scaling(self, name: str, opt_class: Type[SolverBase], use_presolve: boo self.assertAlmostEqual(rc[m.x], 1) self.assertAlmostEqual(rc[m.y], 0) - @mark_parameterized.expand(input=_load_tests([("highs", Highs)])) + @mark_parameterized.expand(input=_load_tests([("highs_persistent", Highs)])) def test_node_limit( self, name: str, opt_class: Type[SolverBase], use_presolve: bool ): diff --git a/pyomo/solvers/plugins/solvers/HIGHS.py b/pyomo/solvers/plugins/solvers/HIGHS.py new file mode 100644 index 00000000000..3768f2ec749 --- /dev/null +++ b/pyomo/solvers/plugins/solvers/HIGHS.py @@ -0,0 +1,256 @@ +# ____________________________________________________________________________________ +# +# Pyomo: Python Optimization Modeling Objects +# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC +# Under the terms of Contract DE-NA0003525 with National Technology and Engineering +# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this +# software. This software is distributed under the 3-clause BSD License. +# ____________________________________________________________________________________ + +import os +import re +import logging +import subprocess + +from pyomo.common import Executable +from pyomo.common.enums import minimize +from pyomo.common.collections import Bunch +from pyomo.common.tempfiles import TempfileManager + +from pyomo.opt.base import ProblemFormat, ResultsFormat, OptSolver +from pyomo.opt.base.solvers import _extract_version, SolverFactory +from pyomo.opt.results import ( + SolverResults, + SolverStatus, + TerminationCondition, + SolutionStatus, + Solution, +) +from pyomo.opt.solver import SystemCallSolver + +logger = logging.getLogger('pyomo.solvers') + + +@SolverFactory.register('highs', doc='The HiGHS LP/MIP solver') +class HIGHS(OptSolver): + """The HiGHS LP/MIP solver""" + + def __new__(cls, *args, **kwds): + mode = kwds.pop('solver_io', 'lp') + if mode == 'lp' or mode == 'mps' or mode is None: + opt = SolverFactory('_highs_shell', **kwds) + if mode == 'mps': + opt.set_problem_format(ProblemFormat.mps) + else: + opt.set_problem_format(ProblemFormat.cpxlp) + return opt + elif mode == 'direct' or mode == 'python': + return SolverFactory('highs_persistent_v2', **kwds) + else: + logger.error('Unknown IO type: %s' % mode) + return SolverFactory('_failsafe_unknown_solver') + + +@SolverFactory.register('_highs_shell', doc='Shell interface to the HiGHS solver') +class HIGHSSHELL(SystemCallSolver): + """Shell interface to the HiGHS LP/MIP solver""" + + def __init__(self, **kwds): + kwds['type'] = 'highs' + super(HIGHSSHELL, self).__init__(**kwds) + + self._valid_problem_formats = [ + ProblemFormat.cpxlp, + ProblemFormat.mps, + ] + self._valid_result_formats = { + ProblemFormat.cpxlp: [ResultsFormat.soln], + ProblemFormat.mps: [ResultsFormat.soln], + } + + self._capabilities = Bunch() + self._capabilities.linear = True + self._capabilities.integer = True + self._capabilities.quadratic_objective = True + self._capabilities.quadratic_constraint = False + self._capabilities.sos1 = False + self._capabilities.sos2 = False + + self.set_problem_format(ProblemFormat.cpxlp) + self._timelimit = None + + def _default_results_format(self, prob_format): + return ResultsFormat.soln + + def _default_executable(self): + executable = Executable("highs") + if not executable: + logger.warning( + "Could not locate the 'highs' executable, which is " + "required for solver %s" % self.name + ) + self.enable = False + return None + return executable.path() + + def _get_version(self): + result = subprocess.run( + [self.executable(), "--version"], + timeout=5, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + ) + return _extract_version(result.stdout) + + def create_command_line(self, executable, problem_files): + if self._log_file is None: + self._log_file = TempfileManager.create_tempfile(suffix=".highs.log") + + problem_filename_prefix = problem_files[0] + if '.' in problem_filename_prefix: + tmp = problem_filename_prefix.split('.') + if len(tmp) > 2: + problem_filename_prefix = '.'.join(tmp[:-1]) + else: + problem_filename_prefix = tmp[0] + + self._soln_file = problem_filename_prefix + ".sol" + + cmd = [executable, problem_files[0]] + + if self._timelimit is not None and self._timelimit > 0.0: + cmd.extend(['--time_limit', str(self._timelimit)]) + + cmd.extend(['--solution_file', self._soln_file]) + + for key, val in self.options.items(): + if val is None or (isinstance(val, str) and val.strip() == ''): + cmd.append(f"--{key}") + else: + cmd.extend([f"--{key}", str(val)]) + + return Bunch(cmd=cmd, log_file=self._log_file, env=None) + + def process_logfile(self): + results = SolverResults() + soln = Solution() + + results.problem.name = None + results.problem.sense = minimize + optim_value = None + + try: + with open(self._log_file, 'r') as f: + output = f.read() + except Exception: + output = "" + + for line in output.split("\n"): + line = line.strip() + if line.startswith("Model status"): + status_str = line.split(":")[-1].strip().lower() + if status_str == "optimal": + results.solver.status = SolverStatus.ok + results.solver.termination_condition = TerminationCondition.optimal + soln.status = SolutionStatus.optimal + elif status_str == "infeasible": + results.solver.status = SolverStatus.warning + results.solver.termination_condition = TerminationCondition.infeasible + soln.status = SolutionStatus.infeasible + elif status_str == "unbounded": + results.solver.status = SolverStatus.warning + results.solver.termination_condition = TerminationCondition.unbounded + soln.status = SolutionStatus.unbounded + elif status_str in ('infeasible or unbounded', 'infeasibleorunbounded'): + results.solver.status = SolverStatus.warning + results.solver.termination_condition = TerminationCondition.infeasibleOrUnbounded + soln.status = SolutionStatus.infeasible + elif status_str in ('time limit reached', 'solution limit reached'): + results.solver.status = SolverStatus.aborted + results.solver.termination_condition = TerminationCondition.maxTimeLimit + soln.status = SolutionStatus.stoppedByLimit + elif status_str == 'iteration limit reached': + results.solver.status = SolverStatus.aborted + results.solver.termination_condition = TerminationCondition.maxIterations + soln.status = SolutionStatus.stoppedByLimit + elif line.startswith("Objective value"): + try: + optim_value = float(line.split(":")[-1].strip()) + except ValueError: + pass + + if soln.status is SolutionStatus.optimal and optim_value is not None: + soln.objective['__default_objective__'] = {'Value': optim_value} + + if soln.status in [SolutionStatus.optimal, SolutionStatus.stoppedByLimit]: + results.solution.insert(soln) + + return results + + def process_soln_file(self, results): + if len(results.solution) == 0: + return + + soln = results.solution[0] + + extract_duals = False + extract_reduced_costs = False + for suffix in self._suffixes: + if re.match(suffix, "dual"): + extract_duals = True + elif re.match(suffix, "rc"): + extract_reduced_costs = True + else: + raise RuntimeError( + f"***HiGHS solver plugin cannot extract solution suffix={suffix}" + ) + + if not os.path.exists(self._soln_file): + return + + with open(self._soln_file, 'r') as f: + lines = f.readlines() + + section = None + for line in lines: + line = line.strip() + if not line: + continue + + if line.startswith("Columns"): + section = "columns" + continue + elif line.startswith("Rows"): + section = "rows" + continue + elif line.startswith("Model status"): + section = None + continue + elif line.startswith("Index Status"): + continue + + if section == "columns": + parts = line.split() + if len(parts) >= 7: + try: + primal = float(parts[4]) + dual = float(parts[5]) + name = parts[6] + if extract_reduced_costs: + soln.variable[name] = {"Value": primal, "Rc": dual} + else: + soln.variable[name] = {"Value": primal} + except ValueError: + pass + + elif section == "rows": + parts = line.split() + if len(parts) >= 7: + try: + dual = float(parts[5]) + name = parts[6] + if extract_duals: + soln.constraint[name] = {"Dual": dual} + except ValueError: + pass diff --git a/pyomo/solvers/plugins/solvers/__init__.py b/pyomo/solvers/plugins/solvers/__init__.py index b29b199d055..66661d89a9d 100644 --- a/pyomo/solvers/plugins/solvers/__init__.py +++ b/pyomo/solvers/plugins/solvers/__init__.py @@ -15,6 +15,7 @@ GUROBI, BARON, ASL, + HIGHS, pywrapper, SCIPAMPL, CONOPT, diff --git a/pyomo/solvers/plugins/solvers/tests/test_highs_shell.py b/pyomo/solvers/plugins/solvers/tests/test_highs_shell.py new file mode 100644 index 00000000000..4dce6ab0171 --- /dev/null +++ b/pyomo/solvers/plugins/solvers/tests/test_highs_shell.py @@ -0,0 +1,300 @@ +# ____________________________________________________________________________________ +# +# Pyomo: Python Optimization Modeling Objects +# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC +# Under the terms of Contract DE-NA0003525 with National Technology and Engineering +# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this +# software. This software is distributed under the 3-clause BSD License. +# ____________________________________________________________________________________ + +import os +import tempfile +import unittest + +import pyomo.environ as pyo +from pyomo.opt.base import ProblemFormat +from pyomo.opt.results import SolverStatus, SolutionStatus, TerminationCondition +from pyomo.solvers.plugins.solvers.HIGHS import HIGHSSHELL + + +def _make_lp_model(): + m = pyo.ConcreteModel() + m.x = pyo.Var(bounds=(0, 1)) + m.y = pyo.Var(bounds=(0, 1)) + m.obj = pyo.Objective(expr=m.x + m.y, sense=pyo.maximize) + m.c1 = pyo.Constraint(expr=m.x + m.y <= 1.5) + return m + + +def _highs_available(): + s = HIGHSSHELL() + return s.available(exception_flag=False) + + +_requires_highs = unittest.skipUnless( + _highs_available(), "HiGHS executable not found on PATH" +) + + +class TestHighsSolverFactoryDispatch(unittest.TestCase): + def test_default_returns_shell(self): + s = pyo.SolverFactory('highs') + self.assertIsInstance(s, HIGHSSHELL) + + def test_lp_mode_returns_shell(self): + s = pyo.SolverFactory('highs', solver_io='lp') + self.assertIsInstance(s, HIGHSSHELL) + self.assertEqual(s._problem_format, ProblemFormat.cpxlp) + + def test_mps_mode_returns_shell(self): + s = pyo.SolverFactory('highs', solver_io='mps') + self.assertIsInstance(s, HIGHSSHELL) + self.assertEqual(s._problem_format, ProblemFormat.mps) + + def test_direct_mode_not_shell(self): + s = pyo.SolverFactory('highs', solver_io='direct') + self.assertNotIsInstance(s, HIGHSSHELL) + + def test_highs_shell_name_registered(self): + s = pyo.SolverFactory('_highs_shell') + self.assertIsInstance(s, HIGHSSHELL) + + def test_unknown_mode_returns_unknown_solver(self): + from pyomo.opt.base.solvers import UnknownSolver + with self.assertLogs('pyomo.solvers', level='ERROR'): + s = pyo.SolverFactory('highs', solver_io='nonexistent_mode') + self.assertIsInstance(s, UnknownSolver) + + +class TestHighsShellCommandLine(unittest.TestCase): + def setUp(self): + self.solver = HIGHSSHELL() + self._log_file = tempfile.NamedTemporaryFile(suffix='.highs.log', delete=False) + self._log_file.close() + self.solver._log_file = self._log_file.name + + def tearDown(self): + os.unlink(self._log_file.name) + + def _make_cmd(self, problem_file, **opts): + self.solver.options.update(opts) + return self.solver.create_command_line( + executable='highs', problem_files=[problem_file] + ) + + def test_basic_command_structure(self): + b = self._make_cmd('model.lp') + self.assertEqual(b.cmd[0], 'highs') + self.assertEqual(b.cmd[1], 'model.lp') + self.assertIn('--solution_file', b.cmd) + + def test_solution_file_derived_from_problem_file(self): + b = self._make_cmd('/tmp/problem.lp') + idx = b.cmd.index('--solution_file') + self.assertEqual(b.cmd[idx + 1], '/tmp/problem.sol') + + def test_time_limit_added(self): + self.solver._timelimit = 60 + b = self._make_cmd('model.lp') + self.assertIn('--time_limit', b.cmd) + self.assertEqual(b.cmd[b.cmd.index('--time_limit') + 1], '60') + + def test_extra_options_forwarded(self): + b = self._make_cmd('model.lp', presolve='off', simplex_scale_strategy=2) + self.assertIn('--presolve', b.cmd) + self.assertIn('--simplex_scale_strategy', b.cmd) + + def test_solution_file_attribute_set(self): + self._make_cmd('model.lp') + self.assertTrue(self.solver._soln_file.endswith('.sol')) + + +class TestHighsShellLogfileParsing(unittest.TestCase): + def setUp(self): + self.solver = HIGHSSHELL() + self._log_file = tempfile.NamedTemporaryFile( + suffix='.highs.log', delete=False, mode='w' + ) + self._log_file.close() + self.solver._log_file = self._log_file.name + + def tearDown(self): + os.unlink(self._log_file.name) + + def _write_log(self, content): + with open(self._log_file.name, 'w') as f: + f.write(content) + + def test_optimal(self): + self._write_log( + "Model status : Optimal\n" + "Objective value : 1.5000000000e+00\n" + ) + results = self.solver.process_logfile() + self.assertEqual(results.solver.status, SolverStatus.ok) + self.assertEqual( + results.solver.termination_condition, TerminationCondition.optimal + ) + self.assertEqual(len(results.solution), 1) + self.assertAlmostEqual( + results.solution[0].objective['__default_objective__']['Value'], 1.5 + ) + + def test_infeasible(self): + self._write_log("Model status : Infeasible\n") + results = self.solver.process_logfile() + self.assertEqual(results.solver.status, SolverStatus.warning) + self.assertEqual( + results.solver.termination_condition, TerminationCondition.infeasible + ) + self.assertEqual(len(results.solution), 0) + + def test_unbounded(self): + self._write_log("Model status : Unbounded\n") + results = self.solver.process_logfile() + self.assertEqual( + results.solver.termination_condition, TerminationCondition.unbounded + ) + + def test_infeasible_or_unbounded(self): + self._write_log("Model status : Infeasible or unbounded\n") + results = self.solver.process_logfile() + self.assertEqual( + results.solver.termination_condition, + TerminationCondition.infeasibleOrUnbounded, + ) + + def test_time_limit(self): + self._write_log("Model status : Time limit reached\n") + results = self.solver.process_logfile() + self.assertEqual( + results.solver.termination_condition, TerminationCondition.maxTimeLimit + ) + + def test_iteration_limit(self): + self._write_log("Model status : Iteration limit reached\n") + results = self.solver.process_logfile() + self.assertEqual( + results.solver.termination_condition, TerminationCondition.maxIterations + ) + + def test_empty_log_does_not_crash(self): + self._write_log("") + self.assertIsNotNone(self.solver.process_logfile()) + + +class TestHighsShellSolnFileParsing(unittest.TestCase): + _SAMPLE_SOL = ( + "Columns\n" + " Index Status Lower Upper Primal Dual Name\n" + " 0 BS 0 1 0.5 -0 x\n" + " 1 UB 0 1 1 -0 y\n" + "Rows\n" + " Index Status Lower Upper Primal Dual Name\n" + " 0 UB -inf 1.5 1.5 1 c1\n" + "\n" + "Model status: Optimal\n" + ) + + def setUp(self): + self.solver = HIGHSSHELL() + self._sol_file = tempfile.NamedTemporaryFile( + suffix='.sol', delete=False, mode='w' + ) + self._sol_file.write(self._SAMPLE_SOL) + self._sol_file.close() + self.solver._soln_file = self._sol_file.name + + def tearDown(self): + os.unlink(self._sol_file.name) + + def _results_with_soln(self): + from pyomo.opt.results import SolverResults, Solution + r = SolverResults() + r.solution.insert(Solution()) + return r + + def test_variable_values_extracted(self): + results = self._results_with_soln() + self.solver._suffixes = [] + self.solver.process_soln_file(results) + soln = results.solution[0] + self.assertAlmostEqual(soln.variable['x']['Value'], 0.5) + self.assertAlmostEqual(soln.variable['y']['Value'], 1.0) + + def test_duals_extracted_when_requested(self): + results = self._results_with_soln() + self.solver._suffixes = ['dual'] + self.solver.process_soln_file(results) + self.assertAlmostEqual(results.solution[0].constraint['c1']['Dual'], 1.0) + + def test_reduced_costs_extracted_when_requested(self): + results = self._results_with_soln() + self.solver._suffixes = ['rc'] + self.solver.process_soln_file(results) + self.assertIn('Rc', results.solution[0].variable['x']) + + def test_missing_soln_file_does_not_crash(self): + self.solver._soln_file = '/nonexistent/path.sol' + results = self._results_with_soln() + self.solver._suffixes = [] + self.solver.process_soln_file(results) + + def test_empty_solution_list_skipped(self): + from pyomo.opt.results import SolverResults + results = SolverResults() + self.solver._suffixes = [] + self.solver.process_soln_file(results) + + +@_requires_highs +class TestHighsShellSolveRoundTrip(unittest.TestCase): + def _solve(self, model, **kwargs): + return pyo.SolverFactory('highs').solve(model, **kwargs) + + def test_optimal_lp(self): + m = _make_lp_model() + res = self._solve(m) + self.assertEqual(res.solver.status, SolverStatus.ok) + self.assertEqual( + res.solver.termination_condition, TerminationCondition.optimal + ) + self.assertAlmostEqual(pyo.value(m.x) + pyo.value(m.y), 1.5, places=4) + + def test_objective_value(self): + m = _make_lp_model() + res = self._solve(m) + obj_val = res.solution[0].objective['__default_objective__']['Value'] + self.assertAlmostEqual(obj_val, 1.5, places=4) + + def test_mps_mode(self): + m = _make_lp_model() + opt = pyo.SolverFactory('highs', solver_io='mps') + res = opt.solve(m) + self.assertEqual( + res.solver.termination_condition, TerminationCondition.optimal + ) + + def test_infeasible_lp(self): + m = pyo.ConcreteModel() + m.x = pyo.Var(bounds=(0, 1)) + m.obj = pyo.Objective(expr=m.x) + m.c1 = pyo.Constraint(expr=m.x >= 2) + res = self._solve(m) + self.assertEqual( + res.solver.termination_condition, TerminationCondition.infeasible + ) + + def test_timelimit_respected(self): + m = _make_lp_model() + opt = pyo.SolverFactory('highs') + opt._timelimit = 0.001 + res = opt.solve(m) + self.assertIn( + res.solver.termination_condition, + {TerminationCondition.optimal, TerminationCondition.maxTimeLimit}, + ) + + +if __name__ == '__main__': + unittest.main()