diff --git a/changes/10050.feature.md b/changes/10050.feature.md new file mode 100644 index 00000000000..c956a901688 --- /dev/null +++ b/changes/10050.feature.md @@ -0,0 +1 @@ +Implement Blue-Green deployment strategy diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index ba996f4fa21..6b9bf0572b6 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -583,6 +583,18 @@ def termination_priority(self) -> int: return 0 return _HEALTH_TERMINATION_PRIORITY.get(self.health_status, 0) + @property + def is_ready(self) -> bool: + """Return True if the route is RUNNING and HEALTHY. + + Does NOT check traffic_status — blue-green green routes are INACTIVE + until promotion but must still be classified as "ready" for the + auto-promote health gate. + """ + return ( + self.status == RouteStatus.RUNNING and self.health_status == RouteHealthStatus.HEALTHY + ) + @dataclass class DeploymentInfoWithRoutes: diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index 78e2d5b706a..e55160e7373 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -2526,6 +2526,38 @@ async def update_endpoint( return row.to_deployment_info() + async def set_current_revision( + self, + endpoint_id: uuid.UUID, + revision_id: uuid.UUID, + ) -> bool: + """Set current_revision on an endpoint that has none yet. + + Used for the initial deployment path: when ``create_deployment`` is + called with an ``initial_revision`` there is no prior "blue" revision + to swap from, so we skip the strategy FSM entirely and mark the + revision as current directly. Subsequent revision updates follow the + normal activate → deploying_revision → strategy-driven swap flow. + + Uses ``current_revision IS NULL`` as a guard to prevent clobbering an + already-active revision. + + Returns: + ``True`` if the endpoint was updated, ``False`` if it already had + a current_revision (guard fired). + """ + async with self._begin_session_read_committed() as db_sess: + update_query = ( + sa.update(EndpointRow) + .where( + EndpointRow.id == endpoint_id, + EndpointRow.current_revision.is_(None), + ) + .values(current_revision=revision_id) + ) + result = await db_sess.execute(update_query) + return cast(CursorResult[Any], result).rowcount > 0 + async def set_deploying_revision( self, endpoint_id: uuid.UUID, @@ -2917,6 +2949,41 @@ async def _complete_deployment_revision_swap( result = await db_sess.execute(query) return cast(CursorResult[Any], result).rowcount + async def complete_manual_promote( + self, + deployment_id: uuid.UUID, + promote: BatchUpdater[RoutingRow] | None, + drain: BatchUpdater[RoutingRow] | None, + ) -> int: + """Atomically finalize a manually-promoted deployment. + + Manual promote bypasses the FSM coordinator, so the DB update must + perform every state change that the coordinator would have done + for an auto-promoted deployment: route mutations, revision swap, + sub_step clear, and the DEPLOYING → READY lifecycle transition — + all in one transaction. + + Returns the number of deployments whose revision was swapped (0 or 1). + """ + async with self._begin_session_read_committed() as db_sess: + await self._promote_routes(db_sess, promote) + await self._drain_routes(db_sess, drain) + swap_query = ( + sa.update(EndpointRow) + .where( + EndpointRow.id == deployment_id, + EndpointRow.deploying_revision.is_not(None), + ) + .values( + current_revision=EndpointRow.deploying_revision, + deploying_revision=None, + sub_step=None, + lifecycle_stage=EndpointLifecycle.READY, + ) + ) + result = await db_sess.execute(swap_query) + return cast(CursorResult[Any], result).rowcount + async def clear_deploying_revision( self, deployment_ids: set[uuid.UUID], diff --git a/src/ai/backend/manager/repositories/deployment/repository.py b/src/ai/backend/manager/repositories/deployment/repository.py index 946aa6316d7..9fd196a7231 100644 --- a/src/ai/backend/manager/repositories/deployment/repository.py +++ b/src/ai/backend/manager/repositories/deployment/repository.py @@ -1306,6 +1306,22 @@ async def update_endpoint( """ return await self._db_source.update_endpoint(updater) + @deployment_repository_resilience.apply() + async def set_current_revision( + self, + endpoint_id: uuid.UUID, + revision_id: uuid.UUID, + ) -> bool: + """Set current_revision on an endpoint that has none yet. + + Used for the initial deployment path; skips the strategy FSM. + + Returns: + ``True`` if updated, ``False`` if the endpoint already had a + current_revision (guard fired). + """ + return await self._db_source.set_current_revision(endpoint_id, revision_id) + @deployment_repository_resilience.apply() async def set_deploying_revision( self, @@ -1631,13 +1647,15 @@ async def promote_deployment( promote_route_ids: list[UUID], drain_route_ids: list[UUID], ) -> int: - """Promote a deployment by switching traffic from blue to green routes. + """Finalize a manually-promoted blue-green deployment. - Atomically promotes green routes (→ACTIVE), drains blue routes (→TERMINATING), - and swaps deploying_revision → current_revision. + Atomically promotes green routes (→ACTIVE), drains blue routes + (→TERMINATING), swaps deploying_revision → current_revision, and + transitions lifecycle DEPLOYING → READY. Manual promote bypasses + the FSM coordinator, so the lifecycle transition must happen here + rather than in a later handler tick. - Returns: - Number of deployments whose revision was swapped (0 or 1). + Returns the number of deployments whose revision was swapped (0 or 1). """ promote: BatchUpdater[RoutingRow] | None = None if promote_route_ids: @@ -1660,11 +1678,10 @@ async def promote_deployment( conditions=[RouteIdConditions.by_ids(drain_route_ids)], ) - return await self._db_source.apply_strategy_mutations( - rollout=[], - drain=drain, + return await self._db_source.complete_manual_promote( + deployment_id=deployment_id, promote=promote, - completed_ids={deployment_id}, + drain=drain, ) @deployment_repository_resilience.apply() diff --git a/src/ai/backend/manager/services/deployment/service.py b/src/ai/backend/manager/services/deployment/service.py index 3de6699c8f8..2f7191f2a24 100644 --- a/src/ai/backend/manager/services/deployment/service.py +++ b/src/ai/backend/manager/services/deployment/service.py @@ -496,8 +496,18 @@ async def create_deployment( creator, resolved_creator.policy ) - # Create initial revision if provided, via the same path as add_model_revision - # to ensure preset/merge/resolve logic is applied consistently. + # Initial revision handling. + # + # If no initial_revision is provided: leave current_revision=None. The + # deployment has nothing to serve; no routes are created. The caller + # must later add a revision and invoke activate_revision to trigger a + # deployment cycle. + # + # If an initial_revision is provided: register the revision and mark + # it as the current_revision directly. There is no prior revision to + # "blue-green" against, so we bypass the strategy FSM and schedule + # ACTIVE routes via the normal check_pending path. The blue-green / + # rolling strategy only applies to *subsequent* revision activations. if revision is not None: add_result = await self.add_model_revision( AddModelRevisionAction( @@ -505,11 +515,11 @@ async def create_deployment( adder=revision, ) ) - await self.activate_revision( - ActivateRevisionAction( - deployment_id=deployment_info.id, - revision_id=add_result.revision.id, - ) + await self._deployment_repository.set_current_revision( + deployment_info.id, add_result.revision.id + ) + await self._deployment_controller.mark_lifecycle_needed( + DeploymentLifecycleType.CHECK_PENDING ) # Re-fetch deployment info to include the created revision diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py index 13a570cde2f..8d98c63a032 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py @@ -43,7 +43,6 @@ DeploymentLifecycleStatus, DeploymentLifecycleSubStep, DeploymentStatusTransitions, - RouteHealthStatus, RouteStatus, ) from ai.backend.manager.data.model_serving.types import EndpointLifecycle @@ -342,7 +341,7 @@ async def _classify_routes( drain_route_ids: list[uuid.UUID] = [] for route in route_search.items: if route.revision_id == info.deploying_revision_id: - if route.health_status == RouteHealthStatus.HEALTHY: + if route.is_ready: promote_route_ids.append(route.route_id) elif route.status.is_active(): drain_route_ids.append(route.route_id) diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py index 24ce3d775bb..e37a8de6db0 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py @@ -36,13 +36,18 @@ class StrategyApplyResult: routes_drained: int = 0 """Number of routes marked for draining.""" + routes_promoted: int = 0 + """Number of routes promoted (traffic_status -> ACTIVE).""" + def has_mutations(self) -> bool: """Check if there are any route mutations to persist. Returns True when at least one of the following is present: - new routes to roll out, routes to drain, or deployments completed. + new routes to roll out, routes to drain, routes to promote, or deployments completed. """ - return bool(self.completed_ids or self.routes_created or self.routes_drained) + return bool( + self.completed_ids or self.routes_created or self.routes_drained or self.routes_promoted + ) class StrategyResultApplier: @@ -50,7 +55,8 @@ class StrategyResultApplier: Handles route mutations from a strategy evaluation cycle: 1. Route rollout (create) and drain (terminate) - 2. Revision swap for COMPLETED deployments + 2. Route promotion (traffic_status -> ACTIVE) + 3. Revision swap for COMPLETED deployments Sub-step transitions are handled exclusively by the coordinator. """ @@ -69,6 +75,7 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult completed_ids=completed_ids, routes_created=len(changes.rollout_specs), routes_drained=len(changes.drain_route_ids), + routes_promoted=len(changes.promote_route_ids), ) drain: BatchUpdater[RoutingRow] | None = None @@ -82,6 +89,16 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult conditions=[RouteConditions.by_ids(changes.drain_route_ids)], ) + promote: BatchUpdater[RoutingRow] | None = None + if changes.promote_route_ids: + promote = BatchUpdater( + spec=RouteBatchUpdaterSpec( + traffic_ratio=1.0, + traffic_status=RouteTrafficStatus.ACTIVE, + ), + conditions=[RouteConditions.by_ids(changes.promote_route_ids)], + ) + rollout = changes.rollout_specs if not result.has_mutations(): @@ -90,14 +107,16 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult swapped = await self._deployment_repo.apply_strategy_mutations( rollout=rollout, drain=drain, - promote=None, + promote=promote, completed_ids=completed_ids, ) log.debug( - "Applied evaluation: {} routes created, {} routes drained", + "Applied evaluation: {} assignments, {} routes created, {} routes drained, {} routes promoted", + len(summary.assignments), result.routes_created, result.routes_drained, + result.routes_promoted, ) if completed_ids: log.info( diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py index 683873c7295..b4928721775 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/blue_green.py @@ -6,16 +6,58 @@ from __future__ import annotations +import logging +import uuid from collections.abc import Sequence +from dataclasses import dataclass, field from typing import override +from ai.backend.common.data.permission.types import RBACElementType +from ai.backend.logging import BraceStyleAdapter from ai.backend.manager.data.deployment.types import ( DeploymentInfo, + DeploymentLifecycleSubStep, + RouteHealthStatus, RouteInfo, + RouteStatus, + RouteTrafficStatus, ) -from ai.backend.manager.models.deployment_policy import DeploymentStrategySpec +from ai.backend.manager.data.permission.types import RBACElementRef +from ai.backend.manager.models.deployment_policy import BlueGreenSpec, DeploymentStrategySpec +from ai.backend.manager.models.routing import RoutingRow +from ai.backend.manager.repositories.base.rbac.entity_creator import RBACEntityCreator +from ai.backend.manager.repositories.deployment.creators import RouteCreatorSpec +from ai.backend.manager.sokovan.deployment.exceptions import InvalidEndpointState -from .types import AbstractDeploymentStrategy, StrategyCycleResult +from .types import AbstractDeploymentStrategy, RouteChanges, StrategyCycleResult + +log = BraceStyleAdapter(logging.getLogger(__name__)) + + +@dataclass +class _ClassifiedRoutes: + """Routes classified by revision and status. + + Only ``blue_active`` and ``green_healthy`` retain full RouteInfo + (needed for drain/promote route IDs and delay calculation). + Other buckets store counts only. + """ + + blue_active: list[RouteInfo] = field(default_factory=list) + green_healthy: list[RouteInfo] = field(default_factory=list) + green_provisioning_count: int = 0 + green_unhealthy_count: int = 0 + green_failed_count: int = 0 + + @property + def total_green_running(self) -> int: + """Count of green-revision routes whose processes are still running. + + Includes provisioning and unhealthy routes to prevent duplicate route + creation. (Unhealthy routes are excluded from promotion decisions + since they cannot serve traffic reliably.) + """ + return self.green_provisioning_count + len(self.green_healthy) + self.green_unhealthy_count class BlueGreenStrategy(AbstractDeploymentStrategy): @@ -28,5 +70,150 @@ def evaluate_cycle( routes: Sequence[RouteInfo], spec: DeploymentStrategySpec, ) -> StrategyCycleResult: - """Evaluate one cycle of blue-green deployment for a single deployment.""" - raise NotImplementedError("Blue-green deployment strategy is not yet implemented") + """Evaluate one cycle of blue-green deployment for a single deployment. + + FSM flow: + 1. Classify routes into blue (old) / green (new) by revision_id. + 2. If all green healthy + auto_promote + delay met → COMPLETED. + 3. If all green healthy but promotion not met → AWAITING_PROMOTION. + 4. Otherwise → PROVISIONING (create routes / wait for readiness). + + Rollback is not decided by the FSM — the coordinator's timeout + sweep handles it by transitioning to ROLLING_BACK when the + deploying timeout is exceeded. + """ + if not isinstance(spec, BlueGreenSpec): + raise TypeError(f"Expected BlueGreenSpec, got {type(spec).__name__}") + desired = deployment.replica_spec.target_replica_count + deploying_revision = deployment.deploying_revision_id + if deploying_revision is None: + raise InvalidEndpointState( + f"Deployment {deployment.id} has DEPLOYING lifecycle but deploying_revision_id is None. " + "This indicates an inconsistent state — the deployment will be skipped." + ) + + classified = self._classify_routes(routes, deploying_revision) + + log.debug( + "deployment {}: sub_step={}, routes total={}, " + "blue_active={}, green_prov={}, green_healthy={}, green_unhealthy={}, green_failed={}", + deployment.id, + deployment.sub_step, + len(routes), + len(classified.blue_active), + classified.green_provisioning_count, + len(classified.green_healthy), + classified.green_unhealthy_count, + classified.green_failed_count, + ) + + if result := self._check_awaiting_promotion(deployment, classified, desired): + return result + return self._compute_route_mutations(deployment, classified, desired) + + def _classify_routes( + self, + routes: Sequence[RouteInfo], + deploying_revision: uuid.UUID, + ) -> _ClassifiedRoutes: + """Classify routes into blue (old) / green (new) buckets.""" + classified = _ClassifiedRoutes() + for route in routes: + is_green = route.revision_id == deploying_revision + if not is_green: + if route.status.is_active(): + classified.blue_active.append(route) + continue + + if route.status == RouteStatus.PROVISIONING: + classified.green_provisioning_count += 1 + elif route.status.is_inactive(): + classified.green_failed_count += 1 + elif route.health_status == RouteHealthStatus.HEALTHY: + classified.green_healthy.append(route) + else: + classified.green_unhealthy_count += 1 + return classified + + def _check_awaiting_promotion( + self, + deployment: DeploymentInfo, + classified: _ClassifiedRoutes, + desired: int, + ) -> StrategyCycleResult | None: + """Return AWAITING_PROMOTION when all green are healthy but promotion conditions not met.""" + if len(classified.green_healthy) < desired: + return None + log.debug( + "deployment {}: all green healthy, awaiting promotion", + deployment.id, + ) + return StrategyCycleResult(sub_step=DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION) + + def _compute_route_mutations( + self, + deployment: DeploymentInfo, + classified: _ClassifiedRoutes, + desired: int, + ) -> StrategyCycleResult: + """Return PROVISIONING — create green routes or wait for readiness. + + This is the default fallback when AWAITING_PROMOTION + conditions are not met. + """ + # No green routes at all → create the full set as INACTIVE + if classified.total_green_running == 0 and not classified.green_failed_count: + log.debug( + "deployment {}: no green routes — creating {} INACTIVE routes", + deployment.id, + desired, + ) + return StrategyCycleResult( + sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING, + route_changes=RouteChanges( + rollout_specs=_build_green_route_creators(deployment, desired), + ), + ) + + # Green routes still provisioning or not enough healthy → wait + log.debug( + "deployment {}: green healthy={}/{}, provisioning={} — waiting", + deployment.id, + len(classified.green_healthy), + desired, + classified.green_provisioning_count, + ) + return StrategyCycleResult(sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING) + + +def _build_green_route_creators( + deployment: DeploymentInfo, + count: int, +) -> list[RBACEntityCreator[RoutingRow]]: + """Build route creator specs for green routes.""" + if deployment.deploying_revision_id is None: + raise InvalidEndpointState( + f"Deployment {deployment.id} has no deploying_revision_id; cannot build route creators." + ) + creators: list[RBACEntityCreator[RoutingRow]] = [] + for _ in range(count): + spec = RouteCreatorSpec( + endpoint_id=deployment.id, + session_owner_id=deployment.metadata.session_owner, + domain=deployment.metadata.domain, + project_id=deployment.metadata.project, + revision_id=deployment.deploying_revision_id, + traffic_status=RouteTrafficStatus.INACTIVE, + traffic_ratio=0.0, + ) + creators.append( + RBACEntityCreator( + spec=spec, + element_type=RBACElementType.ROUTING, + scope_ref=RBACElementRef( + element_type=RBACElementType.MODEL_DEPLOYMENT, + element_id=str(deployment.id), + ), + ) + ) + return creators diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py index 261bb17927e..6a705c2b1be 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/evaluator.py @@ -149,6 +149,7 @@ async def evaluate( changes = cycle_result.route_changes result.route_changes.rollout_specs.extend(changes.rollout_specs) result.route_changes.drain_route_ids.extend(changes.drain_route_ids) + result.route_changes.promote_route_ids.extend(changes.promote_route_ids) self._record_route_changes(deployment, changes) # Classify into assignments @@ -157,8 +158,12 @@ async def evaluate( return result def _record_route_changes(self, deployment: DeploymentInfo, changes: RouteChanges) -> None: - """Record rollout/drain operations as sub-steps for observability.""" - if not changes.rollout_specs and not changes.drain_route_ids: + """Record rollout/drain/promote operations as sub-steps for observability.""" + if ( + not changes.rollout_specs + and not changes.drain_route_ids + and not changes.promote_route_ids + ): return pool = DeploymentRecorderContext.current_pool() recorder = pool.recorder(deployment.id) @@ -169,6 +174,12 @@ def _record_route_changes(self, deployment: DeploymentInfo, changes: RouteChange success_detail=f"{len(changes.rollout_specs)} new route(s)", ): pass + if changes.promote_route_ids: + with recorder.step( + "promote", + success_detail=f"{len(changes.promote_route_ids)} route(s)", + ): + pass if changes.drain_route_ids: with recorder.step( "drain", diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/types.py b/src/ai/backend/manager/sokovan/deployment/strategy/types.py index c4357cde8fd..109e7d332f8 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/types.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/types.py @@ -26,13 +26,15 @@ class RouteChanges: rollout_specs: list[RBACEntityCreator[RoutingRow]] = field(default_factory=list) drain_route_ids: list[UUID] = field(default_factory=list) + promote_route_ids: list[UUID] = field(default_factory=list) @dataclass class StrategyCycleResult: """Result of evaluating a single deployment's strategy cycle. - ``sub_step`` indicates the next state: PROVISIONING or COMPLETED. + ``sub_step`` indicates the next state: PROVISIONING, + AWAITING_PROMOTION, or COMPLETED. """ sub_step: DeploymentLifecycleSubStep diff --git a/tests/unit/manager/repositories/deployment/test_deployment_repository.py b/tests/unit/manager/repositories/deployment/test_deployment_repository.py index 786c343972f..b431bacf2c1 100644 --- a/tests/unit/manager/repositories/deployment/test_deployment_repository.py +++ b/tests/unit/manager/repositories/deployment/test_deployment_repository.py @@ -3515,57 +3515,57 @@ async def test_image_id( async def test_domain( self, db_with_cleanup: ExtendedAsyncSAEngine, - ) -> DomainRow: + ) -> AsyncGenerator[DomainRow, None]: """Create test domain.""" + domain = DomainRow( + name=f"test-domain-{uuid.uuid4().hex[:8]}", + description="Test domain", + is_active=True, + total_resource_slots=ResourceSlot(), + allowed_vfolder_hosts={}, + allowed_docker_registries=[], + ) async with db_with_cleanup.begin_session() as db_sess: - domain = DomainRow( - name=f"test-domain-{uuid.uuid4().hex[:8]}", - description="Test domain", - is_active=True, - total_resource_slots=ResourceSlot(), - allowed_vfolder_hosts={}, - allowed_docker_registries=[], - ) db_sess.add(domain) await db_sess.commit() - return domain + yield domain @pytest.fixture async def test_scaling_group( self, db_with_cleanup: ExtendedAsyncSAEngine, - ) -> ScalingGroupRow: + ) -> AsyncGenerator[ScalingGroupRow, None]: """Create test scaling group.""" + sgroup = ScalingGroupRow( + name=f"test-sgroup-{uuid.uuid4().hex[:8]}", + description="Test scaling group", + is_active=True, + driver="static", + driver_opts={}, + scheduler="fifo", + scheduler_opts=ScalingGroupOpts(), + ) async with db_with_cleanup.begin_session() as db_sess: - sgroup = ScalingGroupRow( - name=f"test-sgroup-{uuid.uuid4().hex[:8]}", - description="Test scaling group", - is_active=True, - driver="static", - driver_opts={}, - scheduler="fifo", - scheduler_opts=ScalingGroupOpts(), - ) db_sess.add(sgroup) await db_sess.commit() - return sgroup + yield sgroup @pytest.fixture async def default_project_policy( self, db_with_cleanup: ExtendedAsyncSAEngine, - ) -> ProjectResourcePolicyRow: + ) -> AsyncGenerator[ProjectResourcePolicyRow, None]: """Create default project resource policy.""" + policy = ProjectResourcePolicyRow( + name="default", + max_vfolder_count=10, + max_quota_scope_size=0, + max_network_count=0, + ) async with db_with_cleanup.begin_session() as db_sess: - policy = ProjectResourcePolicyRow( - name="default", - max_vfolder_count=10, - max_quota_scope_size=0, - max_network_count=0, - ) db_sess.add(policy) await db_sess.commit() - return policy + yield policy @pytest.fixture async def test_group( @@ -3573,21 +3573,21 @@ async def test_group( db_with_cleanup: ExtendedAsyncSAEngine, test_domain: DomainRow, default_project_policy: ProjectResourcePolicyRow, - ) -> GroupRow: + ) -> AsyncGenerator[GroupRow, None]: """Create test group (project).""" + group = GroupRow( + id=uuid.uuid4(), + name=f"test-group-{uuid.uuid4().hex[:8]}", + domain_name=test_domain.name, + description="Test group", + is_active=True, + total_resource_slots=ResourceSlot(), + resource_policy=default_project_policy.name, + ) async with db_with_cleanup.begin_session() as db_sess: - group = GroupRow( - id=uuid.uuid4(), - name=f"test-group-{uuid.uuid4().hex[:8]}", - domain_name=test_domain.name, - description="Test group", - is_active=True, - total_resource_slots=ResourceSlot(), - resource_policy=default_project_policy.name, - ) db_sess.add(group) await db_sess.commit() - return group + yield group @pytest.fixture async def different_group( @@ -3595,21 +3595,21 @@ async def different_group( db_with_cleanup: ExtendedAsyncSAEngine, test_domain: DomainRow, default_project_policy: ProjectResourcePolicyRow, - ) -> GroupRow: + ) -> AsyncGenerator[GroupRow, None]: """Create a different group (project) for cross-project tests.""" + group = GroupRow( + id=uuid.uuid4(), + name=f"different-group-{uuid.uuid4().hex[:8]}", + domain_name=test_domain.name, + description="Different group", + is_active=True, + total_resource_slots=ResourceSlot(), + resource_policy=default_project_policy.name, + ) async with db_with_cleanup.begin_session() as db_sess: - group = GroupRow( - id=uuid.uuid4(), - name=f"different-group-{uuid.uuid4().hex[:8]}", - domain_name=test_domain.name, - description="Different group", - is_active=True, - total_resource_slots=ResourceSlot(), - resource_policy=default_project_policy.name, - ) db_sess.add(group) await db_sess.commit() - return group + yield group @pytest.fixture def deployment_repository( diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py index 13c216587e7..8e88549dc06 100644 --- a/tests/unit/manager/sokovan/deployment/strategy/test_applier.py +++ b/tests/unit/manager/sokovan/deployment/strategy/test_applier.py @@ -70,47 +70,38 @@ def summary_with_rollout() -> StrategyEvaluationSummary: @pytest.fixture def summary_with_drain() -> StrategyEvaluationSummary: return _build_summary( - {uuid4(): DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING}, + {uuid4(): DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION}, route_changes=RouteChanges(drain_route_ids=[uuid4()]), ) @pytest.fixture def completed_summary() -> tuple[StrategyEvaluationSummary, set[UUID]]: - ep_id_1 = uuid4() - ep_id_2 = uuid4() - completed_ids = {ep_id_1, ep_id_2} + endpoint_id_1 = uuid4() + endpoint_id_2 = uuid4() + completed_ids = {endpoint_id_1, endpoint_id_2} summary = _build_summary({ - ep_id_1: DeploymentLifecycleSubStep.DEPLOYING_COMPLETED, - ep_id_2: DeploymentLifecycleSubStep.DEPLOYING_COMPLETED, + endpoint_id_1: DeploymentLifecycleSubStep.DEPLOYING_COMPLETED, + endpoint_id_2: DeploymentLifecycleSubStep.DEPLOYING_COMPLETED, }) return summary, completed_ids @pytest.fixture -def rolled_back_summary() -> tuple[StrategyEvaluationSummary, set[UUID]]: - ep_id = uuid4() - summary = _build_summary({ep_id: DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING}) - return summary, {ep_id} - - -@pytest.fixture -def mixed_summary() -> tuple[StrategyEvaluationSummary, UUID, UUID, UUID]: +def mixed_summary() -> tuple[StrategyEvaluationSummary, UUID, UUID]: provisioning_id = uuid4() completed_id = uuid4() - rolled_back_id = uuid4() summary = _build_summary( { provisioning_id: DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING, completed_id: DeploymentLifecycleSubStep.DEPLOYING_COMPLETED, - rolled_back_id: DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING, }, route_changes=RouteChanges( rollout_specs=[MagicMock()], drain_route_ids=[uuid4()], ), ) - return summary, provisioning_id, completed_id, rolled_back_id + return summary, provisioning_id, completed_id # ============================================================================= @@ -211,9 +202,9 @@ async def test_mixed_assignments_handles_all( self, applier: StrategyResultApplier, mock_deployment_repo: AsyncMock, - mixed_summary: tuple[StrategyEvaluationSummary, UUID, UUID, UUID], + mixed_summary: tuple[StrategyEvaluationSummary, UUID, UUID], ) -> None: - summary, _provisioning_id, completed_id, _rolled_back_id = mixed_summary + summary, _provisioning_id, completed_id = mixed_summary mock_deployment_repo.apply_strategy_mutations.return_value = 1 result = await applier.apply(summary) diff --git a/tests/unit/manager/sokovan/deployment/strategy/test_blue_green.py b/tests/unit/manager/sokovan/deployment/strategy/test_blue_green.py new file mode 100644 index 00000000000..cc657e5689f --- /dev/null +++ b/tests/unit/manager/sokovan/deployment/strategy/test_blue_green.py @@ -0,0 +1,444 @@ +"""Tests for the blue-green FSM evaluation (BEP-1049). + +Tests cover: +- FSM state transitions: PROVISIONING, AWAITING_PROMOTION +- Route status classification (HEALTHY, UNHEALTHY, DEGRADED, FAILED_TO_START, …) +- Edge cases and boundary conditions + +Note: Rollback is not decided by the FSM — the coordinator's timeout +sweep handles it. The FSM only returns PROVISIONING or AWAITING_PROMOTION. +Actual promotion (auto or manual) is handled by AwaitingPromotionHandler. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from uuid import UUID, uuid4 + +import pytest + +from ai.backend.common.data.endpoint.types import EndpointLifecycle +from ai.backend.common.dto.manager.v2.deployment.types import IntOrPercent +from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, + DeploymentLifecycleSubStep, + DeploymentMetadata, + DeploymentNetworkSpec, + DeploymentState, + ReplicaSpec, + RouteHealthStatus, + RouteInfo, + RouteStatus, + RouteTrafficStatus, +) +from ai.backend.manager.models.deployment_policy import BlueGreenSpec, RollingUpdateSpec +from ai.backend.manager.repositories.deployment.creators import RouteCreatorSpec +from ai.backend.manager.sokovan.deployment.strategy.blue_green import BlueGreenStrategy + +ENDPOINT_ID = UUID("aaaaaaaa-0000-0000-0000-aaaaaaaaaaaa") +OLD_REV = UUID("11111111-1111-1111-1111-111111111111") +NEW_REV = UUID("22222222-2222-2222-2222-222222222222") +PROJECT_ID = UUID("cccccccc-cccc-cccc-cccc-cccccccccccc") +USER_ID = UUID("dddddddd-dddd-dddd-dddd-dddddddddddd") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def make_deployment( + *, + desired: int = 2, + deploying_revision_id: UUID = NEW_REV, + current_revision_id: UUID = OLD_REV, +) -> DeploymentInfo: + return DeploymentInfo( + id=ENDPOINT_ID, + metadata=DeploymentMetadata( + name="test-deploy", + domain="default", + project=PROJECT_ID, + resource_group="default", + created_user=USER_ID, + session_owner=USER_ID, + created_at=datetime.now(UTC), + revision_history_limit=5, + ), + state=DeploymentState( + lifecycle=EndpointLifecycle.DEPLOYING, + retry_count=0, + ), + replica_spec=ReplicaSpec( + replica_count=desired, + desired_replica_count=desired, + ), + network=DeploymentNetworkSpec(open_to_public=False), + model_revisions=[], + current_revision_id=current_revision_id, + deploying_revision_id=deploying_revision_id, + ) + + +def make_route( + *, + revision_id: UUID, + status: RouteStatus = RouteStatus.RUNNING, + health_status: RouteHealthStatus = RouteHealthStatus.HEALTHY, + created_at: datetime | None = None, +) -> RouteInfo: + is_active = status.is_active() + return RouteInfo( + route_id=uuid4(), + endpoint_id=ENDPOINT_ID, + session_id=None, + status=status, + health_status=health_status, + traffic_ratio=1.0 if is_active else 0.0, + created_at=created_at or datetime.now(UTC), + revision_id=revision_id, + traffic_status=RouteTrafficStatus.ACTIVE if is_active else RouteTrafficStatus.INACTIVE, + ) + + +# =========================================================================== +# 1. Basic FSM states +# =========================================================================== + + +class TestBasicFSMStates: + """Test fundamental FSM transitions: PROVISIONING and AWAITING_PROMOTION.""" + + def test_no_routes_creates_green(self) -> None: + """First cycle with 0 routes → PROVISIONING with INACTIVE green route creation.""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + + result = BlueGreenStrategy().evaluate_cycle(deployment, [], spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 2 + + def test_only_blue_routes_creates_green(self) -> None: + """Only old-revision routes exist → create all green routes.""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.RUNNING), + make_route(revision_id=OLD_REV, status=RouteStatus.RUNNING), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 2 + + def test_green_provisioning_waits(self) -> None: + """Green routes in PROVISIONING → wait (no create, no drain).""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 0 + assert len(result.route_changes.drain_route_ids) == 0 + + def test_all_green_healthy_auto_promote_awaits(self) -> None: + """All green HEALTHY + auto_promote → AWAITING_PROMOTION (handler does actual promote).""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.RUNNING), + make_route(revision_id=OLD_REV, status=RouteStatus.RUNNING), + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION + assert len(result.route_changes.promote_route_ids) == 0 + assert len(result.route_changes.drain_route_ids) == 0 + + def test_all_green_healthy_manual_promote_awaits(self) -> None: + """All green HEALTHY + manual promote → AWAITING_PROMOTION (no promote/drain).""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=False) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION + assert len(result.route_changes.promote_route_ids) == 0 + assert len(result.route_changes.drain_route_ids) == 0 + + @pytest.mark.parametrize( + "failed_status", + [ + pytest.param(RouteStatus.FAILED_TO_START, id="failed_to_start"), + pytest.param(RouteStatus.TERMINATED, id="terminated"), + ], + ) + def test_all_green_failed_waits_for_timeout(self, failed_status: RouteStatus) -> None: + """All green routes failed → PROVISIONING (coordinator timeout handles rollback).""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=NEW_REV, status=failed_status), + make_route(revision_id=NEW_REV, status=failed_status), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 0 + + +# =========================================================================== +# 2. Route status classification +# =========================================================================== + + +class TestRouteStatusClassification: + """Test how different route statuses affect classification.""" + + def test_degraded_green_treated_as_unhealthy(self) -> None: + """DEGRADED green routes are treated as unhealthy (still warming up).""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + make_route(revision_id=NEW_REV, health_status=RouteHealthStatus.DEGRADED), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + def test_not_checked_green_treated_as_unhealthy(self) -> None: + """NOT_CHECKED green routes are treated as unhealthy (health check pending).""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + make_route( + revision_id=NEW_REV, + status=RouteStatus.RUNNING, + health_status=RouteHealthStatus.NOT_CHECKED, + ), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + def test_unhealthy_green_blocks_promotion(self) -> None: + """UNHEALTHY green routes count as running but not healthy → blocks promotion.""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + make_route(revision_id=NEW_REV, health_status=RouteHealthStatus.UNHEALTHY), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 0 + + def test_unhealthy_green_prevents_duplicate_creation(self) -> None: + """UNHEALTHY green routes are counted in total_green_running → no duplicate creation.""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=NEW_REV, health_status=RouteHealthStatus.UNHEALTHY), + make_route(revision_id=NEW_REV, health_status=RouteHealthStatus.UNHEALTHY), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result.route_changes.rollout_specs) == 0 + + @pytest.mark.parametrize( + "inactive_status", + [ + pytest.param(RouteStatus.TERMINATING, id="terminating"), + pytest.param(RouteStatus.TERMINATED, id="terminated"), + ], + ) + def test_blue_inactive_not_counted_as_active(self, inactive_status: RouteStatus) -> None: + """Blue routes in terminal states are not counted as blue_active.""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=OLD_REV, status=inactive_status), + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION + + def test_partial_green_failure_continues_waiting(self) -> None: + """Some green failed, some healthy → PROVISIONING (not enough healthy).""" + deployment = make_deployment(desired=3) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + make_route(revision_id=NEW_REV, status=RouteStatus.FAILED_TO_START), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + +# =========================================================================== +# 3. Edge cases +# =========================================================================== + + +class TestEdgeCases: + """Edge cases and boundary conditions.""" + + def test_single_replica_deployment(self) -> None: + """desired=1 with 1 blue + 1 green healthy → AWAITING_PROMOTION.""" + deployment = make_deployment(desired=1) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.RUNNING), + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION + + def test_partial_green_healthy_waits(self) -> None: + """1 of 2 green healthy → PROVISIONING (waiting for the other).""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [ + make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING), + ] + + result = BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + assert result.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + def test_deploying_revision_none_rejected(self) -> None: + """If deploying_revision_id is None, evaluate_cycle raises.""" + deployment = make_deployment(desired=1, deploying_revision_id=None) # type: ignore[arg-type] + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + routes = [make_route(revision_id=OLD_REV, status=RouteStatus.RUNNING)] + + with pytest.raises(Exception): # InvalidEndpointState + BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + def test_wrong_spec_type_rejected(self) -> None: + """Passing non-BlueGreenSpec raises TypeError.""" + deployment = make_deployment(desired=1) + spec = RollingUpdateSpec( + max_surge=IntOrPercent(count=1), max_unavailable=IntOrPercent(count=0) + ) + routes: list[RouteInfo] = [] + + with pytest.raises(TypeError, match="Expected BlueGreenSpec"): + BlueGreenStrategy().evaluate_cycle(deployment, routes, spec) + + +# =========================================================================== +# 4. Route creator specs validation +# =========================================================================== + + +class TestRouteCreatorSpecs: + """Validate that route creator specs have correct fields.""" + + def test_creator_specs_use_deploying_revision(self) -> None: + """Created green routes should use the deploying revision metadata.""" + deployment = make_deployment(desired=1) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + + result = BlueGreenStrategy().evaluate_cycle(deployment, [], spec) + + assert len(result.route_changes.rollout_specs) == 1 + creator_spec = result.route_changes.rollout_specs[0].spec + assert isinstance(creator_spec, RouteCreatorSpec) + assert creator_spec.revision_id == NEW_REV + assert creator_spec.endpoint_id == ENDPOINT_ID + assert creator_spec.session_owner_id == USER_ID + assert creator_spec.domain == "default" + assert creator_spec.project_id == PROJECT_ID + + def test_green_routes_created_as_inactive(self) -> None: + """Green routes must be created with INACTIVE traffic status and ratio 0.0.""" + deployment = make_deployment(desired=2) + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + + result = BlueGreenStrategy().evaluate_cycle(deployment, [], spec) + + for creator in result.route_changes.rollout_specs: + creator_spec = creator.spec + assert isinstance(creator_spec, RouteCreatorSpec) + assert creator_spec.traffic_status == RouteTrafficStatus.INACTIVE + assert creator_spec.traffic_ratio == 0.0 + + +# =========================================================================== +# 5. Realistic multi-step scenario (desired=3) +# =========================================================================== + + +class TestRealisticScenario: + """Simulate a realistic blue-green deployment across multiple cycles.""" + + def test_step_by_step_blue_green(self) -> None: + """Full simulation: create → provision → awaiting_promotion.""" + spec = BlueGreenSpec(auto_promote=True, promote_delay_seconds=0) + + # Cycle 1: 3 blue, no green → create 3 INACTIVE green routes + deployment = make_deployment(desired=3) + blue_routes = [ + make_route(revision_id=OLD_REV, status=RouteStatus.RUNNING) for _ in range(3) + ] + result_1 = BlueGreenStrategy().evaluate_cycle(deployment, blue_routes, spec) + assert result_1.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result_1.route_changes.rollout_specs) == 3 + + # Cycle 2: 3 blue + 3 green PROVISIONING → wait + routes_2 = [ + *blue_routes, + *[make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING) for _ in range(3)], + ] + result_2 = BlueGreenStrategy().evaluate_cycle(deployment, routes_2, spec) + assert result_2.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + assert len(result_2.route_changes.rollout_specs) == 0 + + # Cycle 3: 3 blue + 2 green HEALTHY + 1 green PROVISIONING → still waiting + routes_3 = [ + *blue_routes, + *[make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING) for _ in range(2)], + make_route(revision_id=NEW_REV, status=RouteStatus.PROVISIONING), + ] + result_3 = BlueGreenStrategy().evaluate_cycle(deployment, routes_3, spec) + assert result_3.sub_step == DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING + + # Cycle 4: 3 blue + 3 green HEALTHY → AWAITING_PROMOTION + routes_4 = [ + *blue_routes, + *[make_route(revision_id=NEW_REV, status=RouteStatus.RUNNING) for _ in range(3)], + ] + result_4 = BlueGreenStrategy().evaluate_cycle(deployment, routes_4, spec) + assert result_4.sub_step == DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION