Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/10050.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement Blue-Green deployment strategy
10 changes: 10 additions & 0 deletions src/ai/backend/manager/data/deployment/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,16 @@ def termination_priority(self) -> int:
return 0
return _HEALTH_TERMINATION_PRIORITY.get(self.health_status, 0)

@property
def is_ready(self) -> bool:
"""Return True if the route is RUNNING and its health check is HEALTHY.

A ready route is eligible to receive traffic (subject to traffic_status).
Comment thread
jopemachine marked this conversation as resolved.
Outdated
"""
return (
self.status == RouteStatus.RUNNING and self.health_status == RouteHealthStatus.HEALTHY
)


@dataclass
class DeploymentInfoWithRoutes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,38 @@ async def _complete_deployment_revision_swap(
result = await db_sess.execute(query)
return cast(CursorResult[Any], result).rowcount

async def complete_manual_promote(
self,
deployment_id: uuid.UUID,
promote: BatchUpdater[RoutingRow] | None,
drain: BatchUpdater[RoutingRow] | None,
) -> None:
"""Atomically finalize a manually-promoted deployment.

Manual promote bypasses the FSM coordinator, so the DB update must
perform every state change that the coordinator would have done
for an auto-promoted deployment: route mutations, revision swap,
sub_step clear, and the DEPLOYING → READY lifecycle transition —
all in one transaction.
"""
async with self._begin_session_read_committed() as db_sess:
await self._promote_routes(db_sess, promote)
await self._drain_routes(db_sess, drain)
swap_query = (
sa.update(EndpointRow)
.where(
EndpointRow.id == deployment_id,
EndpointRow.deploying_revision.is_not(None),
)
.values(
current_revision=EndpointRow.deploying_revision,
deploying_revision=None,
sub_step=None,
lifecycle_stage=EndpointLifecycle.READY,
)
)
await db_sess.execute(swap_query)

async def clear_deploying_revision(
self,
deployment_ids: set[uuid.UUID],
Expand Down
21 changes: 10 additions & 11 deletions src/ai/backend/manager/repositories/deployment/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -1630,14 +1630,14 @@ async def promote_deployment(
deployment_id: UUID,
promote_route_ids: list[UUID],
drain_route_ids: list[UUID],
) -> int:
"""Promote a deployment by switching traffic from blue to green routes.

Atomically promotes green routes (→ACTIVE), drains blue routes (→TERMINATING),
and swaps deploying_revision → current_revision.
) -> None:
"""Finalize a manually-promoted blue-green deployment.

Returns:
Number of deployments whose revision was swapped (0 or 1).
Atomically promotes green routes (→ACTIVE), drains blue routes
(→TERMINATING), swaps deploying_revision → current_revision, and
transitions lifecycle DEPLOYING → READY. Manual promote bypasses
the FSM coordinator, so the lifecycle transition must happen here
rather than in a later handler tick.
"""
promote: BatchUpdater[RoutingRow] | None = None
if promote_route_ids:
Expand All @@ -1660,11 +1660,10 @@ async def promote_deployment(
conditions=[RouteIdConditions.by_ids(drain_route_ids)],
)

return await self._db_source.apply_strategy_mutations(
rollout=[],
drain=drain,
await self._db_source.complete_manual_promote(
deployment_id=deployment_id,
promote=promote,
completed_ids={deployment_id},
drain=drain,
)

@deployment_repository_resilience.apply()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
DeploymentLifecycleStatus,
DeploymentLifecycleSubStep,
DeploymentStatusTransitions,
RouteHealthStatus,
RouteStatus,
)
from ai.backend.manager.data.model_serving.types import EndpointLifecycle
Expand Down Expand Up @@ -342,7 +341,7 @@ async def _classify_routes(
drain_route_ids: list[uuid.UUID] = []
for route in route_search.items:
if route.revision_id == info.deploying_revision_id:
if route.health_status == RouteHealthStatus.HEALTHY:
if route.is_ready:
promote_route_ids.append(route.route_id)
elif route.status.is_active():
drain_route_ids.append(route.route_id)
Expand Down
29 changes: 24 additions & 5 deletions src/ai/backend/manager/sokovan/deployment/strategy/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,27 @@ class StrategyApplyResult:
routes_drained: int = 0
"""Number of routes marked for draining."""

routes_promoted: int = 0
"""Number of routes promoted (traffic_status -> ACTIVE)."""

def has_mutations(self) -> bool:
"""Check if there are any route mutations to persist.

Returns True when at least one of the following is present:
new routes to roll out, routes to drain, or deployments completed.
new routes to roll out, routes to drain, routes to promote, or deployments completed.
"""
return bool(self.completed_ids or self.routes_created or self.routes_drained)
return bool(
self.completed_ids or self.routes_created or self.routes_drained or self.routes_promoted
)


class StrategyResultApplier:
"""Applies a ``StrategyEvaluationSummary`` to the database.

Handles route mutations from a strategy evaluation cycle:
1. Route rollout (create) and drain (terminate)
2. Revision swap for COMPLETED deployments
2. Route promotion (traffic_status -> ACTIVE)
3. Revision swap for COMPLETED deployments

Sub-step transitions are handled exclusively by the coordinator.
"""
Expand All @@ -69,6 +75,7 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult
completed_ids=completed_ids,
routes_created=len(changes.rollout_specs),
routes_drained=len(changes.drain_route_ids),
routes_promoted=len(changes.promote_route_ids),
)

drain: BatchUpdater[RoutingRow] | None = None
Expand All @@ -82,6 +89,16 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult
conditions=[RouteConditions.by_ids(changes.drain_route_ids)],
)

promote: BatchUpdater[RoutingRow] | None = None
if changes.promote_route_ids:
promote = BatchUpdater(
spec=RouteBatchUpdaterSpec(
traffic_ratio=1.0,
traffic_status=RouteTrafficStatus.ACTIVE,
),
conditions=[RouteConditions.by_ids(changes.promote_route_ids)],
)

rollout = changes.rollout_specs

if not result.has_mutations():
Expand All @@ -90,14 +107,16 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult
swapped = await self._deployment_repo.apply_strategy_mutations(
rollout=rollout,
drain=drain,
promote=None,
promote=promote,
completed_ids=completed_ids,
)

log.debug(
"Applied evaluation: {} routes created, {} routes drained",
"Applied evaluation: {} assignments, {} routes created, {} routes drained, {} routes promoted",
len(summary.assignments),
result.routes_created,
result.routes_drained,
result.routes_promoted,
)
if completed_ids:
log.info(
Expand Down
Loading
Loading