diff --git a/pyomo/network/port.py b/pyomo/network/port.py index f56842096d5..62c5dcfc50c 100644 --- a/pyomo/network/port.py +++ b/pyomo/network/port.py @@ -31,6 +31,7 @@ from pyomo.network.util import create_var, tighten_var_domain + logger = logging.getLogger('pyomo.network') @@ -335,6 +336,7 @@ def __init__(self, *args, **kwd): self._initialize = kwd.pop('initialize', {}) self._implicit = kwd.pop('implicit', {}) self._extends = kwd.pop('extends', None) + self._auto_created_arcs = [] kwd.setdefault('ctype', Port) IndexedComponent.__init__(self, *args, **kwd) @@ -375,6 +377,62 @@ def construct(self, data=None): timer.report() + def connect_to(self, port, block=None): + """Method for connecting current port to another port via an Arc + Args: + port: Port + The destination port to connect to + block: Block, optional + The block on which to construct the Arc. If None, the Arc will be constructed on the port parent block + """ + # NOTE import Arc here to avoid circular import issues, this is the only place in the Port class where Arc is needed + from pyomo.network.arc import Arc + + if block is None: + block = self.parent_block() + if block is None: + raise ValueError( + "Cannot connect Port '%s' to Port '%s' because neither port has a parent block. Please specify a block to construct the Arc on." + % (self.name, port.name) + ) + + def get_safe_name(name): + return ( + name.replace(".", "_") + .replace("-", "") + .replace("[", "_") + .replace("]", "") + ) + + current_port_name = get_safe_name(self.name) + dest_port_name = get_safe_name(port.name) + + arc_name = f"{current_port_name}_to_{dest_port_name}" + block.add_component(arc_name, Arc(source=self, destination=port)) + created_arc = block.find_component(arc_name) + if created_arc is None: + raise RuntimeError( + f"Failed to create Arc '{arc_name}' connecting Port '{self.name}' to Port '{port.name}'." + ) + logger.info( + f"Created Arc '{arc_name}' connecting Port '{self.name}' to Port '{port.name}'." + ) + self._auto_created_arcs.append(created_arc) + + def get_connections(self): + """Returns a single or list of Arcs that were automatically created by the `connect_to` method, + this list can be used for simple propagation or management of automatically created arcs, + if no Arc is created None is returned.""" + if len(self._auto_created_arcs) == 0: + logger.warning( + f"No automatically created arcs found for Port '{self.name}'." + ) + return None + elif len(self._auto_created_arcs) == 1: + return self._auto_created_arcs[0] + else: + return self._auto_created_arcs + def _initialize_members(self, initSet): for idx in initSet: tmp = self[idx] diff --git a/pyomo/network/tests/test_arc.py b/pyomo/network/tests/test_arc.py index f3c26285dc7..446b3c2db72 100644 --- a/pyomo/network/tests/test_arc.py +++ b/pyomo/network/tests/test_arc.py @@ -1986,6 +1986,73 @@ def test_clone(self): m2.y.value = 1.25 self.assertAlmostEqual(value(c.body), 0) + def test_clone(self): + m = ConcreteModel() + m.x = Var() + m.y = Var() + m.p1 = Port() + m.p2 = Port() + m.p1.add(m.x, 'v') + m.p2.add(m.y, 'v') + m.arc = Arc(source=m.p1, destination=m.p2) + + m2 = m.clone() + self.assertEqual(len(m2.p1.arcs()), 1) + self.assertEqual(len(m2.p2.arcs()), 1) + self.assertIs(m2.p1.arcs()[0], m2.arc) + self.assertIs(m2.p2.arcs()[0], m2.arc) + + self.assertIsNot(m2.p1.arcs()[0], m.arc) + self.assertIsNot(m2.p2.arcs()[0], m.arc) + + TransformationFactory('network.expand_arcs').apply_to(m2) + all_cons = list(m2.component_data_objects(Constraint)) + self.assertEqual(len(all_cons), 1) + c = all_cons[0] + self.assertAlmostEqual(value(c.lower), 0) + self.assertAlmostEqual(value(c.upper), 0) + c_vars = ComponentSet(identify_variables(c.body)) + self.assertIn(m2.x, c_vars) + self.assertIn(m2.y, c_vars) + self.assertNotIn(m.x, c_vars) + self.assertNotIn(m.y, c_vars) + m2.x.value = 1.25 + m2.y.value = 1.25 + self.assertAlmostEqual(value(c.body), 0) + + def test_expand_auto_connect(self): + m = ConcreteModel() + m.x = Var() + m.prt = Port() + m.prt.add(m.x, "a") + m.prt.connect_to(m.prt) + created_arc = m.find_component("prt_to_prt") + self.assertEqual(len(list(m.component_objects(Constraint))), 0) + self.assertEqual(len(list(m.component_data_objects(Constraint))), 0) + + TransformationFactory('network.expand_arcs').apply_to(m) + + self.assertEqual(len(list(m.component_objects(Constraint))), 1) + self.assertEqual(len(list(m.component_data_objects(Constraint))), 1) + self.assertFalse(created_arc.active) + blk = m.component('prt_to_prt_expanded') + self.assertTrue(blk.active) + self.assertTrue(blk.component('a_equality').active) + + os = StringIO() + blk.pprint(ostream=os) + self.assertEqual( + os.getvalue(), + """prt_to_prt_expanded : Size=1, Index=None, Active=True + 1 Constraint Declarations + a_equality : Size=1, Index=None, Active=True + Key : Lower : Body : Upper : Active + None : 0.0 : x - x : 0.0 : True + + 1 Declarations: a_equality +""", + ) + if __name__ == "__main__": unittest.main() diff --git a/pyomo/network/tests/test_port.py b/pyomo/network/tests/test_port.py index f62a2b8edd8..1f09b1525e0 100644 --- a/pyomo/network/tests/test_port.py +++ b/pyomo/network/tests/test_port.py @@ -16,6 +16,7 @@ from pyomo.environ import ( ConcreteModel, AbstractModel, + Block, Var, Set, NonNegativeReals, @@ -552,6 +553,62 @@ def _IN(m, i): """, ) + def test_auto_connect(self): + m = ConcreteModel() + m.x = Var() + m.y = Var() + m.p1 = Port() + m.p1.add(m.x) + m.p2 = Port() + m.p2.add(m.y) + m.p1.connect_to(m.p2) + + self.assertIs(m.p1.x, m.x) + self.assertIs(m.p2.y, m.y) + assert m.find_component('p1_to_p2') is not None + assert m.p1.get_connections() is m.find_component('p1_to_p2') + assert m.p2.get_connections() is None + + def test_auto_connect_with_block(self): + m = ConcreteModel() + m.block_a = Block() + m.block_b = Block() + m.block_a.x = Var() + m.block_b.y = Var() + m.block_a.p1 = Port() + m.block_a.p1.add(m.block_a.x) + m.block_b.p2 = Port() + m.block_b.p2.add(m.block_b.y) + m.block_a.p1.connect_to(m.block_b.p2) + + self.assertIs(m.block_a.p1.x, m.block_a.x) + self.assertIs(m.block_b.p2.y, m.block_b.y) + assert m.block_a.find_component('block_a_p1_to_block_b_p2') is not None + assert m.block_a.p1.get_connections() is m.block_a.find_component( + 'block_a_p1_to_block_b_p2' + ) + assert m.block_b.p2.get_connections() is None + + def test_auto_connect_with_indexed_block(self): + m = ConcreteModel() + m.block = Block([1, 2]) + m.block[1].x = Var() + m.block[2].y = Var() + m.block[1].p1 = Port() + m.block[1].p1.add(m.block[1].x) + m.block[2].p2 = Port() + m.block[2].p2.add(m.block[2].y) + m.block[1].p1.connect_to(m.block[2].p2) + + self.assertIs(m.block[1].p1.x, m.block[1].x) + self.assertIs(m.block[2].p2.y, m.block[2].y) + assert m.block[1].find_component('block_1_p1_to_block_2_p2') is not None + + assert m.block[1].p1.get_connections() is m.block[1].find_component( + 'block_1_p1_to_block_2_p2' + ) + assert m.block[2].p2.get_connections() is None + if __name__ == "__main__": unittest.main()