diff --git a/changes/10426.feature.md b/changes/10426.feature.md new file mode 100644 index 00000000000..024111d4581 --- /dev/null +++ b/changes/10426.feature.md @@ -0,0 +1 @@ +Add blue-green deployment infrastructure and promote API diff --git a/docs/manager/graphql-reference/supergraph.graphql b/docs/manager/graphql-reference/supergraph.graphql index d6e1a960923..086fbb9c50d 100644 --- a/docs/manager/graphql-reference/supergraph.graphql +++ b/docs/manager/graphql-reference/supergraph.graphql @@ -10207,6 +10207,11 @@ type Mutation """ activateDeploymentRevision(input: ActivateRevisionInput!): ActivateRevisionPayload! @join__field(graph: STRAWBERRY) + """ + Added in UNRELEASED. Manually promote a blue-green deployment awaiting promotion. When `auto_promote=false`, the deployment stays in AWAITING_PROMOTION once all Green routes are healthy until an operator triggers this mutation, which atomically switches traffic (Green→ACTIVE, Blue→INACTIVE→TERMINATING). + """ + promoteDeployment(input: PromoteDeploymentInput!): PromoteDeploymentPayload! @join__field(graph: STRAWBERRY) + """Added in 25.19.0. Update the traffic status of a route""" updateRouteTrafficStatus(input: UpdateRouteTrafficStatusInput!): UpdateRouteTrafficStatusPayload! @join__field(graph: STRAWBERRY) @@ -12140,6 +12145,23 @@ input ProjectWeightInputItem weight: Decimal = null } +""" +Added in UNRELEASED. Input for manually promoting a blue-green deployment. +""" +input PromoteDeploymentInput + @join__type(graph: STRAWBERRY) +{ + deploymentId: ID! +} + +"""Added in UNRELEASED. Result of manually promoting a deployment.""" +type PromoteDeploymentPayload + @join__type(graph: STRAWBERRY) +{ + """The promoted deployment""" + deployment: ModelDeployment! +} + """ Completely delete domain from DB. diff --git a/docs/manager/graphql-reference/v2-schema.graphql b/docs/manager/graphql-reference/v2-schema.graphql index 704b9691394..d3bddacc067 100644 --- a/docs/manager/graphql-reference/v2-schema.graphql +++ b/docs/manager/graphql-reference/v2-schema.graphql @@ -6209,6 +6209,11 @@ type Mutation { """ activateDeploymentRevision(input: ActivateRevisionInput!): ActivateRevisionPayload! + """ + Added in UNRELEASED. Manually promote a blue-green deployment awaiting promotion. When `auto_promote=false`, the deployment stays in AWAITING_PROMOTION once all Green routes are healthy until an operator triggers this mutation, which atomically switches traffic (Green→ACTIVE, Blue→INACTIVE→TERMINATING). + """ + promoteDeployment(input: PromoteDeploymentInput!): PromoteDeploymentPayload! + """Added in 25.19.0. Update the traffic status of a route""" updateRouteTrafficStatus(input: UpdateRouteTrafficStatusInput!): UpdateRouteTrafficStatusPayload! @@ -7849,6 +7854,19 @@ input ProjectWeightInputItem { weight: Decimal = null } +""" +Added in UNRELEASED. Input for manually promoting a blue-green deployment. +""" +input PromoteDeploymentInput { + deploymentId: ID! +} + +"""Added in UNRELEASED. Result of manually promoting a deployment.""" +type PromoteDeploymentPayload { + """The promoted deployment""" + deployment: ModelDeployment! +} + """Added in 26.4.2. Payload for domain permanent deletion mutation.""" type PurgeDomainPayloadGQL { """Whether the purge was successful.""" diff --git a/src/ai/backend/client/cli/v2/deployment/commands.py b/src/ai/backend/client/cli/v2/deployment/commands.py index 8b88edc9207..93760a97353 100644 --- a/src/ai/backend/client/cli/v2/deployment/commands.py +++ b/src/ai/backend/client/cli/v2/deployment/commands.py @@ -226,6 +226,28 @@ async def _run() -> None: asyncio.run(_run()) +@deployment.command() +@click.argument("deployment_id", type=str) +def promote(deployment_id: str) -> None: + """Manually promote a blue-green deployment awaiting promotion.""" + + from ai.backend.common.dto.manager.v2.deployment.request import ( + PromoteDeploymentInput, + ) + + body = PromoteDeploymentInput(deployment_id=UUID(deployment_id)) + + async def _run() -> None: + registry = await create_v2_registry(load_v2_config()) + try: + result = await registry.deployment.promote(body) + print_result(result) + finally: + await registry.close() + + asyncio.run(_run()) + + @deployment.command() @click.argument("deployment_id", type=str) def delete(deployment_id: str) -> None: diff --git a/src/ai/backend/client/v2/domains_v2/deployment.py b/src/ai/backend/client/v2/domains_v2/deployment.py index e7c49a1ce18..c363086f9da 100644 --- a/src/ai/backend/client/v2/domains_v2/deployment.py +++ b/src/ai/backend/client/v2/domains_v2/deployment.py @@ -21,6 +21,7 @@ CreateDeploymentInput, DeleteAccessTokenInput, DeleteDeploymentInput, + PromoteDeploymentInput, SearchAccessTokensInput, SearchAutoScalingRulesInput, SearchDeploymentPoliciesInput, @@ -48,6 +49,7 @@ GetAccessTokenPayload, GetAutoScalingRulePayload, GetDeploymentPolicyPayload, + PromoteDeploymentPayload, ReplicaNode, RevisionNode, SearchAccessTokensPayload, @@ -233,6 +235,17 @@ async def activate_revision( response_model=ActivateRevisionPayload, ) + async def promote( + self, + body: PromoteDeploymentInput, + ) -> PromoteDeploymentPayload: + """Manually promote a blue-green deployment awaiting promotion.""" + return await self._client.typed_request( + "POST", + _PATH + f"/{body.deployment_id}/promote", + response_model=PromoteDeploymentPayload, + ) + # ------------------------------------------------------------------ # Replica operations # ------------------------------------------------------------------ diff --git a/src/ai/backend/common/dto/manager/v2/deployment/request.py b/src/ai/backend/common/dto/manager/v2/deployment/request.py index e7c7b279a9d..0d9e9debae8 100644 --- a/src/ai/backend/common/dto/manager/v2/deployment/request.py +++ b/src/ai/backend/common/dto/manager/v2/deployment/request.py @@ -850,6 +850,12 @@ class ActivateRevisionInput(BaseRequestModel): revision_id: UUID = Field(description="Revision ID to activate") +class PromoteDeploymentInput(BaseRequestModel): + """Input for manually promoting a blue-green deployment.""" + + deployment_id: UUID = Field(description="Deployment ID to promote") + + class UpdateRouteTrafficStatusInput(BaseRequestModel): """Input for updating a route's traffic status.""" diff --git a/src/ai/backend/common/dto/manager/v2/deployment/response.py b/src/ai/backend/common/dto/manager/v2/deployment/response.py index c5a4e2d33bd..a34077b7218 100644 --- a/src/ai/backend/common/dto/manager/v2/deployment/response.py +++ b/src/ai/backend/common/dto/manager/v2/deployment/response.py @@ -397,6 +397,12 @@ class ActivateRevisionPayload(BaseResponseModel): ) +class PromoteDeploymentPayload(BaseResponseModel): + """Payload for promote deployment mutation result.""" + + deployment: DeploymentNode = Field(description="The promoted deployment") + + class UpdateRouteTrafficStatusPayload(BaseResponseModel): """Payload for update route traffic status mutation result.""" diff --git a/src/ai/backend/manager/api/adapters/deployment.py b/src/ai/backend/manager/api/adapters/deployment.py index 0a27643ca67..e6574b44497 100644 --- a/src/ai/backend/manager/api/adapters/deployment.py +++ b/src/ai/backend/manager/api/adapters/deployment.py @@ -38,6 +38,7 @@ DeleteAccessTokenInput, DeleteDeploymentInput, DeploymentOrder, + PromoteDeploymentInput, ReplicaOrder, RevisionOrder, RouteOrder, @@ -70,6 +71,7 @@ GetAccessTokenPayload, GetAutoScalingRulePayload, GetDeploymentPolicyPayload, + PromoteDeploymentPayload, ReplicaNode, RevisionNode, RouteNode, @@ -276,6 +278,7 @@ ) from ai.backend.manager.services.deployment.actions.revision_operations import ( ActivateRevisionAction, + PromoteDeploymentAction, ) from ai.backend.manager.services.deployment.actions.route.search_routes import SearchRoutesAction from ai.backend.manager.services.deployment.actions.route.update_route_traffic_status import ( @@ -814,6 +817,15 @@ async def activate_revision(self, input: ActivateRevisionInput) -> ActivateRevis deployment_policy=self._policy_data_to_dto(action_result.deployment_policy), ) + async def promote_deployment(self, input: PromoteDeploymentInput) -> PromoteDeploymentPayload: + """Manually promote a blue-green deployment.""" + action_result = await self._processors.deployment.promote_deployment.wait_for_complete( + PromoteDeploymentAction(deployment_id=input.deployment_id) + ) + return PromoteDeploymentPayload( + deployment=self._deployment_data_to_dto(action_result.deployment), + ) + async def delete(self, input: DeleteDeploymentInput) -> DeleteDeploymentPayload: """Delete a deployment.""" await self._processors.deployment.destroy_deployment.wait_for_complete( diff --git a/src/ai/backend/manager/api/gql/deployment/__init__.py b/src/ai/backend/manager/api/gql/deployment/__init__.py index f7b7e9fea89..ae3515dc011 100644 --- a/src/ai/backend/manager/api/gql/deployment/__init__.py +++ b/src/ai/backend/manager/api/gql/deployment/__init__.py @@ -38,6 +38,7 @@ inference_runtime_configs, my_deployments, project_deployments, + promote_deployment, # Replica replica, replica_status_changed, @@ -126,6 +127,8 @@ ModelRuntimeConfigInput, MountPermission, ProjectDeploymentScopeGQL, + PromoteDeploymentInputGQL, + PromoteDeploymentPayloadGQL, ReadinessStatus, ReplicaFilter, ReplicaOrderBy, @@ -236,6 +239,8 @@ # Revision Types "ActivateRevisionInputGQL", "ActivateRevisionPayloadGQL", + "PromoteDeploymentInputGQL", + "PromoteDeploymentPayloadGQL", "AddRevisionInput", "AddRevisionPayload", "ClusterConfig", @@ -295,6 +300,7 @@ # Resolvers - Revision "activate_deployment_revision", "add_model_revision", + "promote_deployment", "inference_runtime_config", "inference_runtime_configs", "revision", diff --git a/src/ai/backend/manager/api/gql/deployment/resolver/__init__.py b/src/ai/backend/manager/api/gql/deployment/resolver/__init__.py index dbd7870cc0e..9e4d1776aa3 100644 --- a/src/ai/backend/manager/api/gql/deployment/resolver/__init__.py +++ b/src/ai/backend/manager/api/gql/deployment/resolver/__init__.py @@ -37,6 +37,7 @@ add_model_revision, inference_runtime_config, inference_runtime_configs, + promote_deployment, revision, revisions, ) @@ -84,6 +85,7 @@ "inference_runtime_configs", "add_model_revision", "activate_deployment_revision", + "promote_deployment", # Route "routes", "route", diff --git a/src/ai/backend/manager/api/gql/deployment/resolver/revision.py b/src/ai/backend/manager/api/gql/deployment/resolver/revision.py index 8a1d9a55b1e..bababc3cb31 100644 --- a/src/ai/backend/manager/api/gql/deployment/resolver/revision.py +++ b/src/ai/backend/manager/api/gql/deployment/resolver/revision.py @@ -15,6 +15,7 @@ from ai.backend.common.dto.manager.v2.deployment.request import ( AdminSearchRevisionsInput, ) +from ai.backend.common.meta.meta import NEXT_RELEASE_VERSION from ai.backend.manager.api.gql.base import encode_cursor, resolve_global_id from ai.backend.manager.api.gql.decorators import ( BackendAIGQLMeta, @@ -34,6 +35,8 @@ ModelRevisionEdge, ModelRevisionFilter, ModelRevisionOrderBy, + PromoteDeploymentInputGQL, + PromoteDeploymentPayloadGQL, ) from ai.backend.manager.api.gql.types import StrawberryGQLContext from ai.backend.manager.data.deployment.inference_runtime_config import ( @@ -181,3 +184,25 @@ async def activate_deployment_revision( activated_revision_id=ID(str(payload.activated_revision_id)), deployment_policy=DeploymentPolicyGQL.from_pydantic(payload.deployment_policy), ) + + +@gql_mutation( + BackendAIGQLMeta( + added_version=NEXT_RELEASE_VERSION, + description=( + "Manually promote a blue-green deployment awaiting promotion. " + "When `auto_promote=false`, the deployment stays in AWAITING_PROMOTION " + "once all Green routes are healthy until an operator triggers this mutation, " + "which atomically switches traffic (Green→ACTIVE, Blue→INACTIVE→TERMINATING)." + ), + ) +) # type: ignore[misc] +async def promote_deployment( + input: PromoteDeploymentInputGQL, + info: Info[StrawberryGQLContext, None], +) -> PromoteDeploymentPayloadGQL: + """Promote a deployment that is in AWAITING_PROMOTION state.""" + payload = await info.context.adapters.deployment.promote_deployment(input.to_pydantic()) + return PromoteDeploymentPayloadGQL( + deployment=ModelDeployment.from_pydantic(payload.deployment), + ) diff --git a/src/ai/backend/manager/api/gql/deployment/types/__init__.py b/src/ai/backend/manager/api/gql/deployment/types/__init__.py index 05569d85656..da74225310e 100644 --- a/src/ai/backend/manager/api/gql/deployment/types/__init__.py +++ b/src/ai/backend/manager/api/gql/deployment/types/__init__.py @@ -105,6 +105,8 @@ ModelRuntimeConfig, ModelRuntimeConfigInput, MountPermission, + PromoteDeploymentInputGQL, + PromoteDeploymentPayloadGQL, ResourceConfig, ResourceConfigInput, ResourceGroupInput, @@ -214,6 +216,8 @@ # Revision "ActivateRevisionInputGQL", "ActivateRevisionPayloadGQL", + "PromoteDeploymentInputGQL", + "PromoteDeploymentPayloadGQL", "AddRevisionInput", "AddRevisionPayload", "ClusterConfig", diff --git a/src/ai/backend/manager/api/gql/deployment/types/revision.py b/src/ai/backend/manager/api/gql/deployment/types/revision.py index 514397da82b..bc4dac1ee66 100644 --- a/src/ai/backend/manager/api/gql/deployment/types/revision.py +++ b/src/ai/backend/manager/api/gql/deployment/types/revision.py @@ -62,6 +62,9 @@ from ai.backend.common.dto.manager.v2.deployment.request import ( ModelRuntimeConfigInput as ModelRuntimeConfigInputDTO, ) +from ai.backend.common.dto.manager.v2.deployment.request import ( + PromoteDeploymentInput as PromoteDeploymentInputDTO, +) from ai.backend.common.dto.manager.v2.deployment.request import ( ResourceConfigInput as ResourceConfigInputDTO, ) @@ -83,6 +86,9 @@ from ai.backend.common.dto.manager.v2.deployment.response import ( AddRevisionPayload as AddRevisionPayloadDTO, ) +from ai.backend.common.dto.manager.v2.deployment.response import ( + PromoteDeploymentPayload as PromoteDeploymentPayloadDTO, +) from ai.backend.common.dto.manager.v2.deployment.response import ( RevisionNode as RevisionNodeDTO, ) @@ -101,6 +107,7 @@ PreStartActionInfoDTO, ResourceConfigInfoDTO, ) +from ai.backend.common.meta.meta import NEXT_RELEASE_VERSION from ai.backend.common.types import MountPermission as CommonMountPermission from ai.backend.manager.api.gql.base import ( OrderDirection, @@ -576,6 +583,29 @@ class ActivateRevisionPayloadGQL: deployment_policy: Annotated[DeploymentPolicyGQL, strawberry.lazy(".policy")] +@gql_pydantic_input( + BackendAIGQLMeta( + description="Input for manually promoting a blue-green deployment.", + added_version=NEXT_RELEASE_VERSION, + ), + name="PromoteDeploymentInput", +) +class PromoteDeploymentInputGQL(PydanticInputMixin[PromoteDeploymentInputDTO]): + deployment_id: ID + + +@gql_pydantic_type( + BackendAIGQLMeta( + added_version=NEXT_RELEASE_VERSION, + description="Result of manually promoting a deployment.", + ), + model=PromoteDeploymentPayloadDTO, + name="PromoteDeploymentPayload", +) +class PromoteDeploymentPayloadGQL: + deployment: Annotated[ModelDeployment, strawberry.lazy(".deployment")] + + # Input Types @gql_pydantic_input( BackendAIGQLMeta(description="", added_version="25.19.0"), diff --git a/src/ai/backend/manager/api/gql/schema.py b/src/ai/backend/manager/api/gql/schema.py index 42673c0e874..7bcbf77fad9 100644 --- a/src/ai/backend/manager/api/gql/schema.py +++ b/src/ai/backend/manager/api/gql/schema.py @@ -81,6 +81,7 @@ inference_runtime_configs, my_deployments, project_deployments, + promote_deployment, # Replica replica, replica_status_changed, @@ -704,6 +705,7 @@ class Mutation: reject_artifact_revision = reject_artifact_revision create_access_token = create_access_token activate_deployment_revision = activate_deployment_revision + promote_deployment = promote_deployment update_route_traffic_status = update_route_traffic_status # Fair Share - Admin APIs admin_upsert_domain_fair_share_weight = admin_upsert_domain_fair_share_weight diff --git a/src/ai/backend/manager/api/rest/v2/deployment/handler.py b/src/ai/backend/manager/api/rest/v2/deployment/handler.py index cf055e8ef3c..23224c5ed87 100644 --- a/src/ai/backend/manager/api/rest/v2/deployment/handler.py +++ b/src/ai/backend/manager/api/rest/v2/deployment/handler.py @@ -26,6 +26,7 @@ CreateDeploymentInput, DeleteAccessTokenInput, DeleteDeploymentInput, + PromoteDeploymentInput, SearchAccessTokensInput, SearchAutoScalingRulesInput, SearchDeploymentPoliciesInput, @@ -195,6 +196,15 @@ async def activate_revision( result = await self._adapter.activate_revision(body.parsed) return APIResponse.build(status_code=HTTPStatus.OK, response_model=result) + async def promote( + self, + path: PathParam[DeploymentIdPathParam], + ) -> APIResponse: + """Manually promote a blue-green deployment awaiting promotion.""" + input_dto = PromoteDeploymentInput(deployment_id=path.parsed.deployment_id) + result = await self._adapter.promote_deployment(input_dto) + return APIResponse.build(status_code=HTTPStatus.OK, response_model=result) + async def search_revision_resource_slots( self, path: PathParam[RevisionIdPathParam], diff --git a/src/ai/backend/manager/api/rest/v2/deployment/registry.py b/src/ai/backend/manager/api/rest/v2/deployment/registry.py index a06546dcc65..7925d1202bd 100644 --- a/src/ai/backend/manager/api/rest/v2/deployment/registry.py +++ b/src/ai/backend/manager/api/rest/v2/deployment/registry.py @@ -105,6 +105,12 @@ def register_v2_deployment_routes( handler.activate_revision, middlewares=[auth_required], ) + registry.add( + "POST", + "/{deployment_id}/promote", + handler.promote, + middlewares=[auth_required], + ) registry.add( "POST", "/revisions/{revision_id}/resource-slots/search", diff --git a/src/ai/backend/manager/data/deployment/types.py b/src/ai/backend/manager/data/deployment/types.py index 787d97fd59f..9489b90e306 100644 --- a/src/ai/backend/manager/data/deployment/types.py +++ b/src/ai/backend/manager/data/deployment/types.py @@ -180,11 +180,17 @@ class DeploymentLifecycleSubStep(enum.StrEnum): """Clearing deploying_revision and transitioning to READY.""" DEPLOYING_COMPLETED = "deploying_completed" """All strategy conditions satisfied; triggers revision swap.""" + DEPLOYING_AWAITING_PROMOTION = "deploying_awaiting_promotion" + """All new routes healthy; waiting for manual approval or delay timer.""" @classmethod def deploying_handler_sub_steps(cls) -> tuple[DeploymentLifecycleSubStep, ...]: """Sub-steps that have their own deploying handler (excludes COMPLETED, which is an evaluator outcome).""" - return (cls.DEPLOYING_PROVISIONING, cls.DEPLOYING_ROLLING_BACK) + return ( + cls.DEPLOYING_PROVISIONING, + cls.DEPLOYING_AWAITING_PROMOTION, + cls.DEPLOYING_ROLLING_BACK, + ) @dataclass(frozen=True) diff --git a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py index e062b83df84..78e2d5b706a 100644 --- a/src/ai/backend/manager/repositories/deployment/db_source/db_source.py +++ b/src/ai/backend/manager/repositories/deployment/db_source/db_source.py @@ -2850,6 +2850,7 @@ async def apply_strategy_mutations( self, rollout: Sequence[RBACEntityCreator[RoutingRow]], drain: BatchUpdater[RoutingRow] | None, + promote: BatchUpdater[RoutingRow] | None, completed_ids: set[uuid.UUID], ) -> int: """Apply route mutations from a strategy evaluation cycle in a single transaction. @@ -2862,6 +2863,7 @@ async def apply_strategy_mutations( """ async with self._begin_session_read_committed() as db_sess: await self._create_routes(db_sess, rollout) + await self._promote_routes(db_sess, promote) await self._drain_routes(db_sess, drain) return await self._complete_deployment_revision_swap(db_sess, completed_ids) @@ -2874,6 +2876,15 @@ async def _create_routes( if rollout: await execute_rbac_entity_creators(db_sess, rollout) + @staticmethod + async def _promote_routes( + db_sess: SASession, + promote: BatchUpdater[RoutingRow] | None, + ) -> None: + """Promote routes by activating their traffic status.""" + if promote: + await execute_batch_updater(db_sess, promote) + @staticmethod async def _drain_routes( db_sess: SASession, diff --git a/src/ai/backend/manager/repositories/deployment/repository.py b/src/ai/backend/manager/repositories/deployment/repository.py index ce6e7706237..946aa6316d7 100644 --- a/src/ai/backend/manager/repositories/deployment/repository.py +++ b/src/ai/backend/manager/repositories/deployment/repository.py @@ -63,6 +63,7 @@ RouteInfo, RouteSearchResult, RouteStatus, + RouteTrafficStatus, ScalingGroupCleanupConfig, ) from ai.backend.manager.data.image.types import ImageIdentifier @@ -79,6 +80,7 @@ from ai.backend.manager.models.deployment_revision import DeploymentRevisionRow from ai.backend.manager.models.endpoint import EndpointRow, EndpointTokenRow from ai.backend.manager.models.routing import RoutingRow +from ai.backend.manager.models.routing.conditions import RouteConditions as RouteIdConditions from ai.backend.manager.models.scheduling_history import ( DeploymentHistoryRow, RouteHistoryRow, @@ -92,6 +94,7 @@ from ai.backend.manager.repositories.base.rbac.entity_creator import RBACEntityCreator from ai.backend.manager.repositories.base.updater import BatchUpdater, Updater from ai.backend.manager.repositories.base.upserter import Upserter +from ai.backend.manager.repositories.deployment.creators import RouteBatchUpdaterSpec from ai.backend.manager.repositories.scheduler.types.session_creation import DeploymentContext from .db_source import DeploymentDBSource @@ -1602,11 +1605,12 @@ async def apply_strategy_mutations( self, rollout: Sequence[RBACEntityCreator[RoutingRow]], drain: BatchUpdater[RoutingRow] | None, + promote: BatchUpdater[RoutingRow] | None, completed_ids: set[UUID], ) -> int: """Apply route mutations from a strategy evaluation cycle. - Performs route rollout/drain and revision swap in a single transaction. + Performs route rollout/drain/promote and revision swap in a single transaction. Sub-step transitions are handled by the coordinator via ``EndpointLifecycleBatchUpdaterSpec``. @@ -1616,9 +1620,53 @@ async def apply_strategy_mutations( return await self._db_source.apply_strategy_mutations( rollout=rollout, drain=drain, + promote=promote, completed_ids=completed_ids, ) + @deployment_repository_resilience.apply() + async def promote_deployment( + self, + deployment_id: UUID, + promote_route_ids: list[UUID], + drain_route_ids: list[UUID], + ) -> int: + """Promote a deployment by switching traffic from blue to green routes. + + Atomically promotes green routes (→ACTIVE), drains blue routes (→TERMINATING), + and swaps deploying_revision → current_revision. + + Returns: + Number of deployments whose revision was swapped (0 or 1). + """ + promote: BatchUpdater[RoutingRow] | None = None + if promote_route_ids: + promote = BatchUpdater( + spec=RouteBatchUpdaterSpec( + traffic_ratio=1.0, + traffic_status=RouteTrafficStatus.ACTIVE, + ), + conditions=[RouteIdConditions.by_ids(promote_route_ids)], + ) + + drain: BatchUpdater[RoutingRow] | None = None + if drain_route_ids: + drain = BatchUpdater( + spec=RouteBatchUpdaterSpec( + status=RouteStatus.TERMINATING, + traffic_ratio=0.0, + traffic_status=RouteTrafficStatus.INACTIVE, + ), + conditions=[RouteIdConditions.by_ids(drain_route_ids)], + ) + + return await self._db_source.apply_strategy_mutations( + rollout=[], + drain=drain, + promote=promote, + completed_ids={deployment_id}, + ) + @deployment_repository_resilience.apply() async def clear_deploying_revision(self, deployment_ids: set[UUID]) -> None: """Clear deploying_revision and sub_step for rolled-back deployments. diff --git a/src/ai/backend/manager/services/deployment/actions/revision_operations/__init__.py b/src/ai/backend/manager/services/deployment/actions/revision_operations/__init__.py index ba7be288a05..f6f3fa40c6d 100644 --- a/src/ai/backend/manager/services/deployment/actions/revision_operations/__init__.py +++ b/src/ai/backend/manager/services/deployment/actions/revision_operations/__init__.py @@ -4,8 +4,14 @@ ActivateRevisionAction, ActivateRevisionActionResult, ) +from .promote_deployment import ( + PromoteDeploymentAction, + PromoteDeploymentActionResult, +) __all__ = [ "ActivateRevisionAction", "ActivateRevisionActionResult", + "PromoteDeploymentAction", + "PromoteDeploymentActionResult", ] diff --git a/src/ai/backend/manager/services/deployment/actions/revision_operations/promote_deployment.py b/src/ai/backend/manager/services/deployment/actions/revision_operations/promote_deployment.py new file mode 100644 index 00000000000..d71abcbbf99 --- /dev/null +++ b/src/ai/backend/manager/services/deployment/actions/revision_operations/promote_deployment.py @@ -0,0 +1,44 @@ +"""Action for manually promoting a blue-green deployment.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import override +from uuid import UUID + +from ai.backend.manager.actions.action import BaseActionResult +from ai.backend.manager.actions.types import ActionOperationType +from ai.backend.manager.data.deployment.types import ModelDeploymentData + +from .base import RevisionOperationBaseAction + + +@dataclass +class PromoteDeploymentAction(RevisionOperationBaseAction): + """Action to manually promote a blue-green deployment. + + Triggers immediate traffic switch from blue (old) to green (new) routes + when the deployment is in AWAITING_PROMOTION state. + """ + + deployment_id: UUID + + @override + def entity_id(self) -> str | None: + return str(self.deployment_id) + + @override + @classmethod + def operation_type(cls) -> ActionOperationType: + return ActionOperationType.UPDATE + + +@dataclass +class PromoteDeploymentActionResult(BaseActionResult): + """Result of promoting a deployment.""" + + deployment: ModelDeploymentData + + @override + def entity_id(self) -> str | None: + return str(self.deployment.id) diff --git a/src/ai/backend/manager/services/deployment/processors.py b/src/ai/backend/manager/services/deployment/processors.py index 4764987d8b6..2b7655cc48e 100644 --- a/src/ai/backend/manager/services/deployment/processors.py +++ b/src/ai/backend/manager/services/deployment/processors.py @@ -101,6 +101,8 @@ from ai.backend.manager.services.deployment.actions.revision_operations import ( ActivateRevisionAction, ActivateRevisionActionResult, + PromoteDeploymentAction, + PromoteDeploymentActionResult, ) from ai.backend.manager.services.deployment.actions.route import ( SearchRoutesAction, @@ -172,6 +174,7 @@ class DeploymentProcessors(AbstractProcessorPackage): SearchRevisionResourceSlotsAction, SearchRevisionResourceSlotsActionResult ] activate_revision: ActionProcessor[ActivateRevisionAction, ActivateRevisionActionResult] + promote_deployment: ActionProcessor[PromoteDeploymentAction, PromoteDeploymentActionResult] # Route operations sync_replicas: ActionProcessor[SyncReplicaAction, SyncReplicaActionResult] @@ -254,6 +257,7 @@ def __init__( service.search_revision_resource_slots, action_monitors ) self.activate_revision = ActionProcessor(service.activate_revision, action_monitors) + self.promote_deployment = ActionProcessor(service.promote_deployment, action_monitors) # Route operations self.sync_replicas = ActionProcessor(service.sync_replicas, action_monitors) @@ -313,6 +317,7 @@ def supported_actions(self) -> list[ActionSpec]: SearchRevisionsAction.spec(), SearchRevisionResourceSlotsAction.spec(), ActivateRevisionAction.spec(), + PromoteDeploymentAction.spec(), # Route operations SyncReplicaAction.spec(), SearchRoutesAction.spec(), diff --git a/src/ai/backend/manager/services/deployment/service.py b/src/ai/backend/manager/services/deployment/service.py index 1ef4fd86506..7f95050ed2c 100644 --- a/src/ai/backend/manager/services/deployment/service.py +++ b/src/ai/backend/manager/services/deployment/service.py @@ -65,6 +65,7 @@ ) from ai.backend.manager.models.deployment_revision_preset.types import PresetValueEntry from ai.backend.manager.models.endpoint import EndpointRow, EndpointTokenRow +from ai.backend.manager.repositories.base import BatchQuerier, NoPagination from ai.backend.manager.repositories.base.rbac.entity_creator import RBACEntityCreator from ai.backend.manager.repositories.base.upserter import Upserter from ai.backend.manager.repositories.deployment import DeploymentRepository @@ -80,6 +81,9 @@ EndpointTokenCreatorSpec, ModelRevisionFields, ) +from ai.backend.manager.repositories.deployment.options import ( + RouteConditions as RouteQueryConditions, +) from ai.backend.manager.repositories.deployment.upserters import DeploymentPolicyUpserterSpec from ai.backend.manager.repositories.deployment_revision_preset.repository import ( DeploymentRevisionPresetRepository, @@ -178,6 +182,8 @@ from ai.backend.manager.services.deployment.actions.revision_operations import ( ActivateRevisionAction, ActivateRevisionActionResult, + PromoteDeploymentAction, + PromoteDeploymentActionResult, ) from ai.backend.manager.services.deployment.actions.route import ( SearchRoutesAction, @@ -1085,6 +1091,77 @@ async def activate_revision( deployment_policy=deployment_policy, ) + async def promote_deployment( + self, action: PromoteDeploymentAction + ) -> PromoteDeploymentActionResult: + """Manually promote a blue-green deployment. + + Directly switches traffic from blue (old) to green (new) routes + when the deployment is in AWAITING_PROMOTION state. This bypasses + the FSM cycle and applies the promote/drain atomically. + """ + deployment_info = await self._deployment_repository.get_endpoint_info(action.deployment_id) + + if deployment_info.sub_step != DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION: + raise InvalidEndpointState( + f"Deployment {action.deployment_id} is not in AWAITING_PROMOTION state " + f"(current sub_step: {deployment_info.sub_step}). " + "Manual promotion is only allowed during AWAITING_PROMOTION." + ) + + deploying_revision_id = deployment_info.deploying_revision_id + if deploying_revision_id is None: + raise InvalidEndpointState( + f"Deployment {action.deployment_id} has no deploying_revision_id." + ) + + # Fetch non-terminated routes for this deployment + route_search = await self._deployment_repository.search_routes( + BatchQuerier( + pagination=NoPagination(), + conditions=[ + RouteQueryConditions.by_endpoint_ids({action.deployment_id}), + RouteQueryConditions.exclude_statuses([RouteStatus.TERMINATED]), + ], + ) + ) + + # Classify into green (promote) and blue (drain) + promote_route_ids = [] + drain_route_ids = [] + for route in route_search.items: + if route.revision_id == deploying_revision_id: + if route.health_status == RouteHealthStatus.HEALTHY: + promote_route_ids.append(route.route_id) + else: + if route.status.is_active(): + drain_route_ids.append(route.route_id) + + if not promote_route_ids: + raise InvalidEndpointState( + f"Deployment {action.deployment_id} has no healthy green routes to promote. " + "Wait for new-revision routes to become HEALTHY before promoting." + ) + + await self._deployment_repository.promote_deployment( + deployment_id=action.deployment_id, + promote_route_ids=promote_route_ids, + drain_route_ids=drain_route_ids, + ) + + log.info( + "Manually promoted deployment {}: {} routes promoted, {} routes drained", + action.deployment_id, + len(promote_route_ids), + len(drain_route_ids), + ) + + deployment_info = await self._deployment_repository.get_endpoint_info(action.deployment_id) + + return PromoteDeploymentActionResult( + deployment=_convert_deployment_info_to_data(deployment_info), + ) + # ========== Route Operations ========== async def sync_replicas(self, action: SyncReplicaAction) -> SyncReplicaActionResult: diff --git a/src/ai/backend/manager/sokovan/deployment/coordinator.py b/src/ai/backend/manager/sokovan/deployment/coordinator.py index 5d63b63726d..2227bd68b10 100644 --- a/src/ai/backend/manager/sokovan/deployment/coordinator.py +++ b/src/ai/backend/manager/sokovan/deployment/coordinator.py @@ -66,6 +66,7 @@ from .handlers import ( CheckPendingDeploymentHandler, CheckReplicaDeploymentHandler, + DeployingAwaitingPromotionHandler, DeployingProvisioningHandler, DeployingRollingBackHandler, DeploymentHandler, @@ -323,6 +324,16 @@ def _init_handlers( deployment_repo=self._deployment_repository, ), ), + ( + ( + DeploymentLifecycleType.DEPLOYING, + DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION, + ), + DeployingAwaitingPromotionHandler( + deployment_controller=self._deployment_controller, + deployment_repository=self._deployment_repository, + ), + ), ( ( DeploymentLifecycleType.DEPLOYING, diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py b/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py index 961e4544307..c0c3f4d8d31 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/__init__.py @@ -4,6 +4,7 @@ from .base import DeploymentHandler from .deploying import ( + DeployingAwaitingPromotionHandler, DeployingProvisioningHandler, DeployingRollingBackHandler, ) @@ -16,6 +17,7 @@ __all__ = [ "CheckPendingDeploymentHandler", "CheckReplicaDeploymentHandler", + "DeployingAwaitingPromotionHandler", "DeployingProvisioningHandler", "DeployingRollingBackHandler", "DeploymentHandler", diff --git a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py index 107568ecac8..c00fa835170 100644 --- a/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py +++ b/src/ai/backend/manager/sokovan/deployment/handlers/deploying.py @@ -1,21 +1,26 @@ """Handlers for DEPLOYING sub-steps (BEP-1049). -Two DEPLOYING handlers are registered in the coordinator's HandlerRegistry: +Three DEPLOYING handlers are registered in the coordinator's HandlerRegistry: -- **DeployingProvisioningHandler**: Runs the strategy FSM each cycle to - create/drain routes and check for completion. +- **DeployingProvisioningHandler**: Creates new-revision routes and waits for + them to become HEALTHY; advances to AWAITING_PROMOTION on success. +- **DeployingAwaitingPromotionHandler**: All new routes healthy; waiting for + promotion trigger (manual approval or delay timer); advances to READY on + COMPLETED, or ROLLING_BACK on timeout/give_up. - **DeployingRollingBackHandler**: Clears ``deploying_revision`` and transitions directly to READY. Sub-step flow:: - PROVISIONING ──(need_retry)──▸ PROVISIONING (route mutations, logged) - │ - │ (success) - ▼ - READY (completed — all routes replaced) - - PROVISIONING ──(timeout)──▸ ROLLING_BACK ──(success)──▸ READY + PROVISIONING ──(success)──▸ AWAITING_PROMOTION + │ │ + │ (expired/give_up) ┌──────┴──────┐ + ▼ ▼ ▼ + ROLLING_BACK COMPLETED ROLLING_BACK + │ │ │ + │ (success) │ (success) │ (success) + ▼ ▼ ▼ + READY READY READY The evaluator determines sub-step assignments and route mutations; the applier persists them to DB atomically. Each handler classifies @@ -26,18 +31,26 @@ from __future__ import annotations import logging +import uuid from collections.abc import Sequence +from datetime import UTC, datetime from typing import override from uuid import UUID from ai.backend.logging import BraceStyleAdapter from ai.backend.manager.data.deployment.types import ( + DeploymentInfo, DeploymentLifecycleStatus, DeploymentLifecycleSubStep, DeploymentStatusTransitions, + RouteHealthStatus, + RouteStatus, ) from ai.backend.manager.data.model_serving.types import EndpointLifecycle from ai.backend.manager.defs import LockID +from ai.backend.manager.models.deployment_policy import BlueGreenSpec +from ai.backend.manager.models.routing.conditions import RouteConditions +from ai.backend.manager.repositories.base import BatchQuerier, NoPagination from ai.backend.manager.repositories.deployment.repository import DeploymentRepository from ai.backend.manager.sokovan.deployment.deployment_controller import DeploymentController from ai.backend.manager.sokovan.deployment.executor import DeploymentExecutor @@ -69,14 +82,10 @@ class DeployingProvisioningHandler(DeploymentHandler): """Handler for the DEPLOYING / PROVISIONING sub-step. - Runs the strategy FSM each cycle to create/drain routes and check - for completion. Classification: - - - **Route mutations executed** (create/drain): need_retry — stays in - PROVISIONING with a new history record for progress tracking. - Never escalated to give_up (normal progress). - - **No changes** (routes still warming up): skipped — no history. - - **Completed** (all old routes replaced): success → READY. + New-revision routes are being created; waiting for them to become HEALTHY. + The evaluator assigns sub-steps; when all new routes are healthy the + deployment advances to AWAITING_PROMOTION (success), otherwise it stays in + PROVISIONING (skipped — no state transition). """ def __init__( @@ -120,17 +129,17 @@ def target_statuses(cls) -> list[DeploymentLifecycleStatus]: def status_transitions(cls) -> DeploymentStatusTransitions: return DeploymentStatusTransitions( success=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.READY, - sub_step=None, - ), - need_retry=DeploymentLifecycleStatus( lifecycle=EndpointLifecycle.DEPLOYING, - sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING, + sub_step=DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION, ), expired=DeploymentLifecycleStatus( lifecycle=EndpointLifecycle.DEPLOYING, sub_step=DeploymentLifecycleSubStep.DEPLOYING_ROLLING_BACK, ), + give_up=DeploymentLifecycleStatus( + lifecycle=EndpointLifecycle.DEPLOYING, + sub_step=DeploymentLifecycleSubStep.DEPLOYING_ROLLING_BACK, + ), ) async def _ensure_endpoints_registered( @@ -170,81 +179,48 @@ async def execute( except Exception as exc: log.exception("Pre-registration step failed: {}", exc) failed_registration_ids = { - d.deployment_info.id - for d in deployments - if not d.deployment_info.network.url - and d.deployment_info.deploying_revision_id is not None + deployment.deployment_info.id + for deployment in deployments + if not deployment.deployment_info.network.url + and deployment.deployment_info.deploying_revision_id is not None } if failed_registration_ids: deployments = [ - d for d in deployments if d.deployment_info.id not in failed_registration_ids + deployment + for deployment in deployments + if deployment.deployment_info.id not in failed_registration_ids ] - deployment_infos = [d.deployment_info for d in deployments] - deployment_map = {d.deployment_info.id: d for d in deployments} + deployment_infos = [deployment.deployment_info for deployment in deployments] + deployment_map = {deployment.deployment_info.id: deployment for deployment in deployments} summary = await self._evaluator.evaluate(deployment_infos) - apply_result = await self._applier.apply(summary) - - # Filter out deployments marked for destruction during DEPLOYING. - destroying_ids = { - d.deployment_info.id - for d in deployments - if d.deployment_info.state.lifecycle - in (EndpointLifecycle.DESTROYING, EndpointLifecycle.DESTROYED) - } - if destroying_ids: - log.warning( - "Skipping {} deployments with DESTROYING/DESTROYED lifecycle during DEPLOYING", - len(destroying_ids), - ) + await self._applier.apply(summary) successes: list[DeploymentWithHistory] = [] - failures: list[DeploymentExecutionError] = [] skipped: list[DeploymentWithHistory] = [] - # COMPLETED → success (coordinator transitions to READY) - for endpoint_id in apply_result.completed_ids: - if endpoint_id in destroying_ids: - continue - deployment = deployment_map.get(endpoint_id) - if deployment is not None: - successes.append(deployment) - - # Evaluation errors → failures - for error_data in summary.errors: - deployment = deployment_map.get(error_data.deployment.id) - if deployment is not None: - failures.append( - DeploymentExecutionError( - deployment_info=deployment, - reason=error_data.reason, - error_detail=error_data.reason, - ) - ) - - # Classify rest: route mutations happened → failures (recoverable, coordinator - # will classify as need_retry), no changes → skipped (no history). - completed_or_error_ids = apply_result.completed_ids | { - e.deployment.id for e in summary.errors - } - has_route_mutations = bool(apply_result.routes_created or apply_result.routes_drained) for deployment in deployments: endpoint_id = deployment.deployment_info.id - if endpoint_id in completed_or_error_ids or endpoint_id in destroying_ids: + assigned = summary.assignments.get(endpoint_id) + if assigned is None: continue - if has_route_mutations: - failures.append( - DeploymentExecutionError( - deployment_info=deployment, - reason="Route mutations in progress", - error_detail="Waiting for route provisioning to complete", - ) - ) + if assigned == DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION: + successes.append(deployment) else: skipped.append(deployment) - return DeploymentExecutionResult(successes=successes, failures=failures, skipped=skipped) + errors = [ + DeploymentExecutionError( + deployment_info=deployment_map[evaluation_error.deployment.id], + reason=evaluation_error.reason, + error_detail=evaluation_error.reason, + ) + for evaluation_error in summary.errors + if evaluation_error.deployment.id in deployment_map + ] + + return DeploymentExecutionResult(successes=successes, failures=errors, skipped=skipped) @override async def post_process(self, result: DeploymentExecutionResult) -> None: @@ -252,17 +228,142 @@ async def post_process(self, result: DeploymentExecutionResult) -> None: DeploymentLifecycleType.DEPLOYING, sub_step=DeploymentLifecycleSubStep.DEPLOYING_PROVISIONING, ) + await self._route_controller.mark_lifecycle_needed(RouteLifecycleType.PROVISIONING) + + +class DeployingAwaitingPromotionHandler(DeploymentHandler): + """Handler for DEPLOYING / AWAITING_PROMOTION. + + Checks whether auto-promotion conditions are met + (``auto_promote=True`` + delay elapsed). If so, executes + promote/drain via the repository and returns *success* so the + coordinator transitions to READY. + + Otherwise the deployment is *skipped* and stays in + AWAITING_PROMOTION until the user calls ``promote_deployment`` + or the deploying timeout expires. + """ + + def __init__( + self, + deployment_controller: DeploymentController, + deployment_repository: DeploymentRepository, + ) -> None: + self._deployment_controller = deployment_controller + self._deployment_repository = deployment_repository + + @classmethod + @override + def name(cls) -> str: + return "deploying-awaiting-promotion" + + @property + @override + def lock_id(self) -> LockID | None: + return LockID.LOCKID_DEPLOYMENT_DEPLOYING + + @classmethod + @override + def target_statuses(cls) -> list[DeploymentLifecycleStatus]: + return [ + DeploymentLifecycleStatus( + lifecycle=EndpointLifecycle.DEPLOYING, + sub_step=DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION, + ), + ] + + @classmethod + @override + def status_transitions(cls) -> DeploymentStatusTransitions: + ready = DeploymentLifecycleStatus( + lifecycle=EndpointLifecycle.READY, + sub_step=None, + ) + rolling_back = DeploymentLifecycleStatus( + lifecycle=EndpointLifecycle.DEPLOYING, + sub_step=DeploymentLifecycleSubStep.DEPLOYING_ROLLING_BACK, + ) + return DeploymentStatusTransitions( + success=ready, + expired=rolling_back, + give_up=rolling_back, + ) + + @override + async def execute( + self, deployments: Sequence[DeploymentWithHistory] + ) -> DeploymentExecutionResult: + successes: list[DeploymentWithHistory] = [] + skipped: list[DeploymentWithHistory] = [] + + for deployment in deployments: + info = deployment.deployment_info + policy = info.policy + if policy is None or not isinstance(policy.strategy_spec, BlueGreenSpec): + skipped.append(deployment) + continue + + spec: BlueGreenSpec = policy.strategy_spec + if not spec.auto_promote: + skipped.append(deployment) + continue + + if spec.promote_delay_seconds > 0 and deployment.phase_started_at is not None: + phase_started_at = deployment.phase_started_at + if phase_started_at.tzinfo is None: + phase_started_at = phase_started_at.replace(tzinfo=UTC) + elapsed = (datetime.now(UTC) - phase_started_at).total_seconds() + if elapsed < spec.promote_delay_seconds: + skipped.append(deployment) + continue + + promote_route_ids, drain_route_ids = await self._classify_routes(info) + await self._deployment_repository.promote_deployment( + deployment_id=info.id, + promote_route_ids=promote_route_ids, + drain_route_ids=drain_route_ids, + ) + log.info("deployment {}: auto-promoted", info.id) + successes.append(deployment) + + return DeploymentExecutionResult(successes=successes, skipped=skipped) + + async def _classify_routes( + self, + info: DeploymentInfo, + ) -> tuple[list[uuid.UUID], list[uuid.UUID]]: + route_search = await self._deployment_repository.search_routes( + BatchQuerier( + pagination=NoPagination(), + conditions=[ + RouteConditions.by_endpoint_ids({info.id}), + RouteConditions.exclude_statuses([RouteStatus.TERMINATED]), + ], + ) + ) + promote_route_ids: list[uuid.UUID] = [] + drain_route_ids: list[uuid.UUID] = [] + for route in route_search.items: + if route.revision_id == info.deploying_revision_id: + if route.health_status == RouteHealthStatus.HEALTHY: + promote_route_ids.append(route.route_id) + elif route.status.is_active(): + drain_route_ids.append(route.route_id) + return promote_route_ids, drain_route_ids + + @override + async def post_process(self, result: DeploymentExecutionResult) -> None: await self._deployment_controller.mark_lifecycle_needed( DeploymentLifecycleType.DEPLOYING, - sub_step=DeploymentLifecycleSubStep.DEPLOYING_ROLLING_BACK, + sub_step=DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION, ) - await self._route_controller.mark_lifecycle_needed(RouteLifecycleType.PROVISIONING) class DeployingRollingBackHandler(DeploymentHandler): """Handler for DEPLOYING / ROLLING_BACK sub-step. - Clears ``deploying_revision`` and transitions directly to READY. + Clears ``deploying_revision`` and transitions to READY, + completing the rollback process. """ def __init__( @@ -298,11 +399,15 @@ def target_statuses(cls) -> list[DeploymentLifecycleStatus]: @classmethod @override def status_transitions(cls) -> DeploymentStatusTransitions: + ready = DeploymentLifecycleStatus( + lifecycle=EndpointLifecycle.READY, + sub_step=None, + ) return DeploymentStatusTransitions( - success=DeploymentLifecycleStatus( - lifecycle=EndpointLifecycle.READY, - sub_step=None, - ), + success=ready, + need_retry=None, + expired=ready, + give_up=ready, ) @override diff --git a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py index 47835450e40..24ce3d775bb 100644 --- a/src/ai/backend/manager/sokovan/deployment/strategy/applier.py +++ b/src/ai/backend/manager/sokovan/deployment/strategy/applier.py @@ -90,6 +90,7 @@ async def apply(self, summary: StrategyEvaluationSummary) -> StrategyApplyResult swapped = await self._deployment_repo.apply_strategy_mutations( rollout=rollout, drain=drain, + promote=None, completed_ids=completed_ids, ) diff --git a/tests/unit/manager/repositories/deployment/test_deployment_repository.py b/tests/unit/manager/repositories/deployment/test_deployment_repository.py index 786c343972f..d4d0fb282fd 100644 --- a/tests/unit/manager/repositories/deployment/test_deployment_repository.py +++ b/tests/unit/manager/repositories/deployment/test_deployment_repository.py @@ -33,6 +33,7 @@ ) from ai.backend.manager.data.auth.hash import PasswordHashAlgorithm from ai.backend.manager.data.deployment.types import ( + DeploymentLifecycleSubStep, DeploymentPolicyData, ModelRevisionData, RouteStatus, @@ -3441,6 +3442,282 @@ async def test_update_route_not_found( assert result is False +class TestPromoteDeployment(TestRouteOperations): + """Test cases for DeploymentRepository.promote_deployment (blue-green).""" + + @pytest.fixture + async def db_with_cleanup( + self, + database_connection: ExtendedAsyncSAEngine, + ) -> AsyncGenerator[ExtendedAsyncSAEngine, None]: + """Override parent fixture to include KeyPairRow (UserRow FK dependency).""" + async with with_tables( + database_connection, + [ + DomainRow, + ScalingGroupRow, + ResourcePresetRow, + UserResourcePolicyRow, + ProjectResourcePolicyRow, + KeyPairResourcePolicyRow, + RoleRow, + UserRoleRow, + UserRow, + KeyPairRow, + GroupRow, + VFolderRow, + ContainerRegistryRow, + ImageRow, + ResourceSlotTypeRow, + AgentRow, + SessionRow, + KernelRow, + EndpointRow, + DeploymentRevisionRow, + DeploymentRevisionResourceSlotRow, + RoutingRow, + AssociationScopesEntitiesRow, + ], + ): + async with database_connection.begin_session() as sess: + for slot_name, slot_type in [("cpu", "count"), ("mem", "bytes")]: + await sess.execute( + sa.text( + "INSERT INTO resource_slot_types (slot_name, slot_type, rank)" + " VALUES (:slot_name, :slot_type, 0)" + " ON CONFLICT DO NOTHING" + ), + {"slot_name": slot_name, "slot_type": slot_type}, + ) + yield database_connection + + @pytest.fixture + async def test_current_revision_id(self) -> uuid.UUID: + return uuid.uuid4() + + @pytest.fixture + async def test_deploying_revision_id(self) -> uuid.UUID: + return uuid.uuid4() + + @pytest.fixture + async def promote_test_endpoint_id( + self, + db_with_cleanup: ExtendedAsyncSAEngine, + test_domain_name: str, + test_scaling_group_name: str, + test_user_uuid: uuid.UUID, + test_group_id: uuid.UUID, + test_current_revision_id: uuid.UUID, + test_deploying_revision_id: uuid.UUID, + ) -> uuid.UUID: + """Create an endpoint in DEPLOYING/AWAITING_PROMOTION with deploying_revision set.""" + endpoint_id = uuid.uuid4() + async with db_with_cleanup.begin_session() as db_sess: + endpoint = EndpointRow( + id=endpoint_id, + name=f"promote-endpoint-{uuid.uuid4().hex[:8]}", + created_user=test_user_uuid, + session_owner=test_user_uuid, + domain=test_domain_name, + project=test_group_id, + resource_group=test_scaling_group_name, + desired_replicas=1, + url=f"http://promote-{uuid.uuid4().hex[:8]}.example.com", + open_to_public=False, + lifecycle_stage=EndpointLifecycle.DEPLOYING, + current_revision=test_current_revision_id, + deploying_revision=test_deploying_revision_id, + sub_step=DeploymentLifecycleSubStep.DEPLOYING_AWAITING_PROMOTION, + ) + db_sess.add(endpoint) + await db_sess.commit() + return endpoint_id + + async def _create_route( + self, + db_with_cleanup: ExtendedAsyncSAEngine, + endpoint_id: uuid.UUID, + revision_id: uuid.UUID, + user_uuid: uuid.UUID, + domain_name: str, + group_id: uuid.UUID, + status: RouteStatus, + traffic_status: RouteTrafficStatus, + traffic_ratio: float, + ) -> uuid.UUID: + route_id = uuid.uuid4() + async with db_with_cleanup.begin_session() as db_sess: + route = RoutingRow( + id=route_id, + endpoint=endpoint_id, + session=None, + session_owner=user_uuid, + domain=domain_name, + project=group_id, + status=status, + traffic_ratio=traffic_ratio, + traffic_status=traffic_status, + revision=revision_id, + ) + db_sess.add(route) + await db_sess.commit() + return route_id + + async def test_promote_activates_green_drains_blue_and_swaps_revision( + self, + deployment_repository: DeploymentRepository, + db_with_cleanup: ExtendedAsyncSAEngine, + promote_test_endpoint_id: uuid.UUID, + test_user_uuid: uuid.UUID, + test_domain_name: str, + test_group_id: uuid.UUID, + test_current_revision_id: uuid.UUID, + test_deploying_revision_id: uuid.UUID, + ) -> None: + """Promoting routes must flip green INACTIVE→ACTIVE, mark blue routes + TERMINATING+INACTIVE, and swap deploying_revision into current_revision.""" + green_route_id = await self._create_route( + db_with_cleanup, + endpoint_id=promote_test_endpoint_id, + revision_id=test_deploying_revision_id, + user_uuid=test_user_uuid, + domain_name=test_domain_name, + group_id=test_group_id, + status=RouteStatus.RUNNING, + traffic_status=RouteTrafficStatus.INACTIVE, + traffic_ratio=0.0, + ) + blue_route_id = await self._create_route( + db_with_cleanup, + endpoint_id=promote_test_endpoint_id, + revision_id=test_current_revision_id, + user_uuid=test_user_uuid, + domain_name=test_domain_name, + group_id=test_group_id, + status=RouteStatus.RUNNING, + traffic_status=RouteTrafficStatus.ACTIVE, + traffic_ratio=1.0, + ) + + swapped = await deployment_repository.promote_deployment( + deployment_id=promote_test_endpoint_id, + promote_route_ids=[green_route_id], + drain_route_ids=[blue_route_id], + ) + + assert swapped == 1 + + async with db_with_cleanup.begin_readonly_session() as db_sess: + green_row = ( + await db_sess.execute(sa.select(RoutingRow).where(RoutingRow.id == green_route_id)) + ).scalar_one() + assert green_row.traffic_status == RouteTrafficStatus.ACTIVE + assert green_row.traffic_ratio == 1.0 + assert green_row.status == RouteStatus.RUNNING + + blue_row = ( + await db_sess.execute(sa.select(RoutingRow).where(RoutingRow.id == blue_route_id)) + ).scalar_one() + assert blue_row.status == RouteStatus.TERMINATING + assert blue_row.traffic_status == RouteTrafficStatus.INACTIVE + assert blue_row.traffic_ratio == 0.0 + + endpoint_row = ( + await db_sess.execute( + sa.select(EndpointRow).where(EndpointRow.id == promote_test_endpoint_id) + ) + ).scalar_one() + assert endpoint_row.current_revision == test_deploying_revision_id + assert endpoint_row.deploying_revision is None + assert endpoint_row.sub_step is None + + async def test_promote_with_only_green_routes_swaps_revision( + self, + deployment_repository: DeploymentRepository, + db_with_cleanup: ExtendedAsyncSAEngine, + promote_test_endpoint_id: uuid.UUID, + test_user_uuid: uuid.UUID, + test_domain_name: str, + test_group_id: uuid.UUID, + test_deploying_revision_id: uuid.UUID, + ) -> None: + """Promoting with no drain routes (initial deployment) must still swap the + revision and activate the green route.""" + green_route_id = await self._create_route( + db_with_cleanup, + endpoint_id=promote_test_endpoint_id, + revision_id=test_deploying_revision_id, + user_uuid=test_user_uuid, + domain_name=test_domain_name, + group_id=test_group_id, + status=RouteStatus.RUNNING, + traffic_status=RouteTrafficStatus.INACTIVE, + traffic_ratio=0.0, + ) + + swapped = await deployment_repository.promote_deployment( + deployment_id=promote_test_endpoint_id, + promote_route_ids=[green_route_id], + drain_route_ids=[], + ) + + assert swapped == 1 + + async with db_with_cleanup.begin_readonly_session() as db_sess: + green_row = ( + await db_sess.execute(sa.select(RoutingRow).where(RoutingRow.id == green_route_id)) + ).scalar_one() + assert green_row.traffic_status == RouteTrafficStatus.ACTIVE + assert green_row.traffic_ratio == 1.0 + + endpoint_row = ( + await db_sess.execute( + sa.select(EndpointRow).where(EndpointRow.id == promote_test_endpoint_id) + ) + ).scalar_one() + assert endpoint_row.current_revision == test_deploying_revision_id + assert endpoint_row.deploying_revision is None + + async def test_promote_without_deploying_revision_does_not_swap( + self, + deployment_repository: DeploymentRepository, + db_with_cleanup: ExtendedAsyncSAEngine, + test_domain_name: str, + test_scaling_group_name: str, + test_user_uuid: uuid.UUID, + test_group_id: uuid.UUID, + ) -> None: + """If the endpoint has no deploying_revision, revision swap is a no-op + (returns 0) — the DB filter guards against mis-targeted promotions.""" + endpoint_id = uuid.uuid4() + async with db_with_cleanup.begin_session() as db_sess: + endpoint = EndpointRow( + id=endpoint_id, + name=f"no-deploying-{uuid.uuid4().hex[:8]}", + created_user=test_user_uuid, + session_owner=test_user_uuid, + domain=test_domain_name, + project=test_group_id, + resource_group=test_scaling_group_name, + desired_replicas=1, + url=f"http://no-deploying-{uuid.uuid4().hex[:8]}.example.com", + open_to_public=False, + lifecycle_stage=EndpointLifecycle.READY, + current_revision=uuid.uuid4(), + deploying_revision=None, + ) + db_sess.add(endpoint) + await db_sess.commit() + + swapped = await deployment_repository.promote_deployment( + deployment_id=endpoint_id, + promote_route_ids=[], + drain_route_ids=[], + ) + + assert swapped == 0 + + class TestDeploymentRepositoryDuplicateName: """Test cases for duplicate endpoint name validation via DeploymentRepository."""