diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java index 5435873be4e62..ddc6c7026810d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java @@ -151,7 +151,19 @@ private void progressToStabilized(Temporal firstChangeEventTimestamp) { private void triggerTransitionToSubsequentState() { progressToPhase(new Transitioning(clock, this)); - transitionContext.transitionToSubsequentState(); + try { + transitionContext.transitionToSubsequentState(); + } catch (Throwable t) { + LOG.warn( + "Failed to transition to subsequent state for job {}. " + + "Resetting to Idling phase to allow future transition attempts.", + getJobId(), + t); + // Reset phase directly (bypassing progressToPhase guard) so the manager + // can respond to future resource changes and retry the transition. + phase = new Idling(clock, this); + throw t; + } } private void progressToPhase(Phase newPhase) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java index 885850f9b74dd..0bd27bf6efffb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java @@ -364,6 +364,42 @@ void testRevokedChangeInStabilizedPhase() { assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class); } + @Test + void testManagerResetsToIdlingWhenTransitionToSubsequentStateFails() { + final TestingStateTransitionManagerContext ctx = + TestingStateTransitionManagerContext.stableContext().withDesiredResources(); + // Make transitionToSubsequentState() throw to simulate e.g. OOM during + // ExecutionGraph creation (FLINK-38997). + ctx.failOnTransition(new RuntimeException("Simulated failure during state transition")); + final DefaultStateTransitionManager testInstance = + ctx.createTestInstanceThatPassedCooldownPhase(); + + assertPhaseWithoutStateTransition(ctx, testInstance, Idling.class); + + // Trigger a change event to move to Stabilizing + testInstance.onChange(); + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + + // onTrigger would normally transition to Transitioning, but the context throws + try { + testInstance.onTrigger(); + } catch (RuntimeException expected) { + // expected: the exception from transitionToSubsequentState propagates + } + + // The manager should have recovered to Idling, NOT stuck in Transitioning + assertThat(testInstance.getPhase()).isInstanceOf(Idling.class); + + // Now stop failing and verify the manager can still process events + ctx.stopFailingOnTransition(); + ctx.clearStateTransition(); + ctx.withDesiredResources(); + testInstance.onChange(); + assertPhaseWithoutStateTransition(ctx, testInstance, Stabilizing.class); + testInstance.onTrigger(); + assertFinalStateTransitionHappened(ctx, testInstance); + } + @Test void testScheduledTaskBeingIgnoredAfterStateChanged() { final TestingStateTransitionManagerContext ctx = @@ -459,6 +495,7 @@ private static class TestingStateTransitionManagerContext // internal state used for assertions private final AtomicBoolean transitionTriggered = new AtomicBoolean(); + private RuntimeException transitionFailure = null; private final SortedMap>> scheduledTasks = new TreeMap<>(); @@ -507,6 +544,14 @@ public TestingStateTransitionManagerContext withSufficientResources() { return this; } + public void failOnTransition(RuntimeException failure) { + this.transitionFailure = failure; + } + + public void stopFailingOnTransition() { + this.transitionFailure = null; + } + // /////////////////////////////////////////////// // StateTransitionManager.Context interface methods // /////////////////////////////////////////////// @@ -524,6 +569,9 @@ public boolean hasDesiredResources() { @Override public void transitionToSubsequentState() { transitionTriggered.set(true); + if (transitionFailure != null) { + throw transitionFailure; + } } @Override