diff --git a/Makefile b/Makefile index 81280795..14294012 100644 --- a/Makefile +++ b/Makefile @@ -53,10 +53,11 @@ ENVTEST_K8S_VERSION ?= 1.28 all: manager ci: test # Run tests +# Uses setup-envtest from controller-runtime (storage.googleapis.com/kubebuilder-tools is deprecated) +ENVTEST_K8S_VERSION ?= 1.28.0 test: generate fmt vet manifests - mkdir -p ${ENVTEST_ASSETS_DIR} - @command -v $(ENVTEST) >/dev/null 2>&1 || GOBIN=$(GOBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@release-0.22 - KUBEBUILDER_ASSETS="$$($(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" GOTOOLCHAIN=go1.25.0+auto go test ./... -coverprofile cover.out + go install sigs.k8s.io/controller-runtime/tools/setup-envtest@release-0.22 + KUBEBUILDER_ASSETS="$$($(GOBIN)/setup-envtest use -p path $(ENVTEST_K8S_VERSION))" GOTOOLCHAIN=go1.25.0+auto go test ./... -coverprofile cover.out # Build manager binary manager: generate fmt vet diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index ffe7a386..59e4258a 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -69,6 +69,7 @@ const ( AnnotationRepo = "picchu.medium.engineering/repo" AnnotationCanaryStartedTimestamp = "picchu.medium.engineering/canaryStartedTimestamp" AnnotationAutoscaler = "picchu.medium.engineering/autoscaler" + AnnotationRamping = "picchu.medium.engineering/ramping" AutoscalerTypeHPA = "hpa" AutoscalerTypeWPA = "wpa" diff --git a/api/v1alpha1/source_defaults.go b/api/v1alpha1/source_defaults.go index 63731f31..11efe7b5 100644 --- a/api/v1alpha1/source_defaults.go +++ b/api/v1alpha1/source_defaults.go @@ -107,7 +107,11 @@ func SetPortDefaults(port *PortInfo) { func SetScaleDefaults(scale *ScaleInfo) { if scale.Default == 0 { - scale.Default = defaultScaleDefault + if scale.Min != nil && *scale.Min > 0 { + scale.Default = *scale.Min + } else { + scale.Default = defaultScaleDefault + } } if scale.Max == 0 { scale.Max = defaultScaleMax diff --git a/api/v1alpha1/source_defaults_test.go b/api/v1alpha1/source_defaults_test.go new file mode 100644 index 00000000..b5faeee5 --- /dev/null +++ b/api/v1alpha1/source_defaults_test.go @@ -0,0 +1,70 @@ +package v1alpha1 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSetScaleDefaults(t *testing.T) { + for _, tc := range []struct { + name string + scale ScaleInfo + expected int32 + }{ + { + name: "Default 0 with Min 16 uses Min", + scale: ScaleInfo{ + Min: int32Ptr(16), + Default: 0, + Max: 96, + }, + expected: 16, + }, + { + name: "Default 0 with Min 1 uses Min", + scale: ScaleInfo{ + Min: int32Ptr(1), + Default: 0, + Max: 1, + }, + expected: 1, + }, + { + name: "Default 0 with Min nil uses 1", + scale: ScaleInfo{ + Min: nil, + Default: 0, + Max: 0, + }, + expected: 1, + }, + { + name: "Default 0 with Min 0 uses 1", + scale: ScaleInfo{ + Min: int32Ptr(0), + Default: 0, + Max: 0, + }, + expected: 1, + }, + { + name: "Default already set is unchanged", + scale: ScaleInfo{ + Min: int32Ptr(16), + Default: 48, + Max: 96, + }, + expected: 48, + }, + } { + t.Run(tc.name, func(t *testing.T) { + SetScaleDefaults(&tc.scale) + assert.Equal(t, tc.expected, tc.scale.Default) + }) + } +} + +func int32Ptr(v int32) *int32 { + return &v +} diff --git a/controllers/incarnation.go b/controllers/incarnation.go index 4580aedc..137119b0 100644 --- a/controllers/incarnation.go +++ b/controllers/incarnation.go @@ -296,6 +296,46 @@ func (i *Incarnation) peakPercent() uint32 { return i.getStatus().PeakPercent } +// getBaseCapacityForRamp returns the 100% capacity baseline for replica allocation during ramp. +// Must match CanRampTo's baseCapacity logic so we allocate enough replicas to pass the ramp check. +func (i *Incarnation) getBaseCapacityForRamp() int32 { + target := i.target() + if target == nil || target.Scale.Min == nil { + return 0 + } + rm := i.controller.getReleaseManager() + if rm == nil { + return int32(*target.Scale.Min) + } + var totalPods int32 = 0 + var totalTrafficPercent uint32 = 0 + var hasUnscaledRevision bool = false + for _, rev := range rm.Status.Revisions { + if rev.Tag == i.tag { + continue + } + if rev.CurrentPercent > 0 && rev.Scale.Current > 0 { + totalPods += int32(rev.Scale.Current) + totalTrafficPercent += rev.CurrentPercent + if rev.PeakPercent == 100 && rev.CurrentPercent < 100 { + hasUnscaledRevision = true + } + } + } + if totalPods == 0 || totalTrafficPercent == 0 { + if *target.Scale.Min > 0 { + return int32(*target.Scale.Min) + } + // Scale.Min==0 (e.g. worker that can scale to zero): use Scale.Max as conservative baseline + return target.Scale.Max + } + if hasUnscaledRevision { + return totalPods + } + normalizedCapacity := float64(totalPods) / (float64(totalTrafficPercent) / 100.0) + return int32(math.Ceil(normalizedCapacity)) +} + func (i *Incarnation) getExternalTestStatus() ExternalTestStatus { target := i.target() if target == nil { @@ -376,10 +416,20 @@ func (i *Incarnation) sync(ctx context.Context) error { var replicas int32 if i.target().Scale.HasAutoscaler() { - replicas = i.divideReplicas(i.target().Scale.Default) + // When ramping, use total capacity (all revisions) so we allocate enough replicas + // for our traffic share. E.g. old=163@15%, new=144@85% → total=307 → new needs ~261. + // Using only other revisions' pods undercounts when we already have pods. + capacity := i.target().Scale.Default + if i.status.State.Current == "canarying" || i.status.State.Current == "releasing" { + // Use only other revisions' pods (baseCapacity). Including our own pods causes + // over-allocation feedback loop (e.g. tutu: 450+383=833 → new gets 752 → total 820). + if base := i.getBaseCapacityForRamp(); base > 0 { + capacity = base + } + } + replicas = i.divideReplicas(capacity) } else { replicas = i.divideReplicasNoAutoscale(i.target().Scale.Default) - } syncPlan := &rmplan.SyncRevision{ App: i.appName(), @@ -411,6 +461,9 @@ func (i *Incarnation) sync(ctx context.Context) error { EventDriven: i.isEventDriven(), TopologySpreadConstraint: i.target().TopologySpreadConstraint, PodDisruptionBudget: i.target().PodDisruptionBudget, + // Only disable autoscaler when ramping UP (canarying/releasing). Keep HPA for released + // incarnations so they scale down based on traffic as we ramp the new revision. + Ramping: i.target().Scale.HasAutoscaler() && (i.status.State.Current == "canarying" || i.status.State.Current == "releasing"), } if !i.isRoutable() { @@ -509,6 +562,10 @@ func (i *Incarnation) genScalePlan(ctx context.Context) *rmplan.ScaleRevision { min = max } else if (i.target().Scale.Worker != nil || i.target().Scale.KedaWorker != nil) && *i.target().Scale.Min == 0 { min = 0 + } else if i.target().Scale.HasAutoscaler() && i.status.State.Current == "released" { + // Give HPA full Scale.Min to Scale.Max; it downscales based on observed load. + min = i.controller.divideReplicas(*i.target().Scale.Min, 100) + max = i.controller.divideReplicas(i.target().Scale.Max, 100) } return &rmplan.ScaleRevision{ @@ -525,6 +582,7 @@ func (i *Incarnation) genScalePlan(ctx context.Context) *rmplan.ScaleRevision { KedaWorker: i.target().Scale.KedaWorker, EventDriven: i.isEventDriven(), ServiceAccountName: i.appName(), + Ramping: i.target().Scale.HasAutoscaler() && (i.status.State.Current == "canarying" || i.status.State.Current == "releasing"), } } diff --git a/controllers/incarnation_test.go b/controllers/incarnation_test.go index e31c2651..c0bdbf1c 100644 --- a/controllers/incarnation_test.go +++ b/controllers/incarnation_test.go @@ -1,6 +1,7 @@ package controllers import ( + "context" ttesting "testing" "time" @@ -279,6 +280,121 @@ func TestIncarnation_divideReplicasNoAutoscale(t *ttesting.T) { } } +// TestGetBaseCapacityForRamp verifies getBaseCapacityForRamp returns the correct capacity +// for replica allocation during ramp, matching CanRampTo's logic. +func TestGetBaseCapacityForRamp(t *ttesting.T) { + scaleMin := int32(16) + scaleMinZero := int32(0) + for _, tc := range []struct { + name string + opts []testIncarnationOption + scaleMin *int32 + scaleMax int32 + expected int32 + }{ + { + name: "no other revisions uses Scale.Min", + opts: []testIncarnationOption{&testClusters{Clusters: 4}}, + scaleMin: &scaleMin, + scaleMax: 96, + expected: 16, + }, + { + name: "old revision at 100% with 120 pods (hasUnscaledRevision) uses totalPods", + opts: []testIncarnationOption{&testClusters{Clusters: 4}, withOtherRevisions{Revisions: []picchu.ReleaseManagerRevisionStatus{{Tag: "old-rev", CurrentPercent: 60, PeakPercent: 100, Scale: picchu.ReleaseManagerRevisionScaleStatus{Current: 120}}}}}, + scaleMin: &scaleMin, + scaleMax: 96, + expected: 120, + }, + { + name: "old revision scaled down uses normalized capacity", + opts: []testIncarnationOption{&testClusters{Clusters: 4}, withOtherRevisions{Revisions: []picchu.ReleaseManagerRevisionStatus{{Tag: "old-rev", CurrentPercent: 60, PeakPercent: 60, Scale: picchu.ReleaseManagerRevisionScaleStatus{Current: 60}}}}}, + scaleMin: &scaleMin, + scaleMax: 96, + expected: 100, // 60 / 0.6 = 100 + }, + { + name: "Scale.Min=0 with no other revisions uses Scale.Max (e.g. jubilee worker)", + opts: []testIncarnationOption{&testClusters{Clusters: 4}}, + scaleMin: &scaleMinZero, + scaleMax: 60, + expected: 60, // Scale.Max when Scale.Min=0 + }, + { + name: "excludes our own pods to avoid over-allocation feedback loop", + opts: []testIncarnationOption{&testClusters{Clusters: 4}, withOtherRevisions{Revisions: []picchu.ReleaseManagerRevisionStatus{{Tag: "old-rev", CurrentPercent: 15, PeakPercent: 100, Scale: picchu.ReleaseManagerRevisionScaleStatus{Current: 163}}}}}, + scaleMin: &scaleMin, + scaleMax: 320, + expected: 163, // other revisions only; our 144 pods must NOT be included + }, + } { + t.Run(tc.name, func(t *ttesting.T) { + i := createTestIncarnation("new-rev", releasing, 1, tc.opts...) + i.revision.Spec.Targets[0].Scale.Min = tc.scaleMin + i.revision.Spec.Targets[0].Scale.Max = tc.scaleMax + if tc.name == "excludes our own pods to avoid over-allocation feedback loop" { + i.status.Scale.Current = 144 + } + assert.Equal(t, tc.expected, i.getBaseCapacityForRamp()) + }) + } +} + +// TestGenScalePlanRampingByState verifies that Ramping is only true for canarying/releasing +// states. Released incarnations keep their HPA so it can scale down based on traffic. +func TestGenScalePlanRampingByState(t *ttesting.T) { + cpuTarget := int32(70) + scaleMax := int32(10) + for _, tc := range []struct { + name string + state State + expectRamp bool + hasAutoscale bool + }{ + {"canarying ramps", canarying, true, true}, + {"releasing ramps", releasing, true, true}, + {"released does not ramp", released, false, true}, + {"deploying does not ramp", deploying, false, true}, + {"deployed does not ramp", deployed, false, true}, + {"pendingrelease does not ramp", pendingrelease, false, true}, + {"no autoscaler never ramps", released, false, false}, + } { + t.Run(tc.name, func(t *ttesting.T) { + i := createTestIncarnation("test", tc.state, 50) + if tc.hasAutoscale { + i.revision.Spec.Targets[0].Scale.TargetCPUUtilizationPercentage = &cpuTarget + i.revision.Spec.Targets[0].Scale.Max = scaleMax + } + plan := i.genScalePlan(context.Background()) + assert.Equal(t, tc.expectRamp, plan.Ramping, "state=%s", tc.state) + }) + } +} + +// TestGenScalePlanReleasedKeepsHPA verifies that released revisions keep normal HPA +// min/max (Scale.Min, Scale.Max). HPA downscales based on observed load, not preemptive pinning. +func TestGenScalePlanReleasedKeepsHPA(t *ttesting.T) { + scaleMin := int32(16) + scaleMax := int32(320) + i := createTestIncarnation("old-rev", released, 15, &testClusters{Clusters: 4}, + withOtherRevisions{ + Revisions: []picchu.ReleaseManagerRevisionStatus{ + {Tag: "new-rev", CurrentPercent: 85, PeakPercent: 85, Scale: picchu.ReleaseManagerRevisionScaleStatus{Current: 144}}, + }, + }) + i.revision.Spec.Targets[0].Scale.Min = &scaleMin + i.revision.Spec.Targets[0].Scale.Max = scaleMax + i.revision.Spec.Targets[0].Scale.TargetCPUUtilizationPercentage = ptr(int32(70)) + + plan := i.genScalePlan(context.Background()) + + // Released gets full Scale.Min to Scale.Max; HPA downscales naturally based on CPU load. + assert.Equal(t, i.controller.divideReplicas(16, 100), plan.Min, "released keeps full Scale.Min") + assert.Equal(t, i.controller.divideReplicas(320, 100), plan.Max, "released keeps full Scale.Max") +} + +func ptr[T any](v T) *T { return &v } + func Test_IsExpired(t *ttesting.T) { for _, test := range []struct { Name string diff --git a/controllers/plan/scaleRevision.go b/controllers/plan/scaleRevision.go index de0a0c57..945fc638 100644 --- a/controllers/plan/scaleRevision.go +++ b/controllers/plan/scaleRevision.go @@ -37,9 +37,31 @@ type ScaleRevision struct { KedaWorker *picchuv1alpha1.KedaScaleInfo EventDriven bool ServiceAccountName string + Ramping bool // When true, delete autoscaler so Picchu can control ReplicaSet replicas directly } func (p *ScaleRevision) Apply(ctx context.Context, cli client.Client, cluster *picchuv1alpha1.Cluster, log logr.Logger) error { + if p.Ramping { + // Delete autoscalers so Picchu can control ReplicaSet replicas directly during ramp + objects := []client.Object{ + &autoscaling.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: p.Tag, Namespace: p.Namespace}, + }, + &kedav1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{Name: p.Tag, Namespace: p.Namespace}, + }, + &wpav1.WorkerPodAutoScaler{ + ObjectMeta: metav1.ObjectMeta{Name: p.Tag, Namespace: p.Namespace}, + }, + } + for _, obj := range objects { + if err := utils.DeleteIfExists(ctx, cli, obj); err != nil { + return err + } + } + return nil + } + if p.Min > p.Max { p.Max = p.Min } diff --git a/controllers/plan/scaleRevision_test.go b/controllers/plan/scaleRevision_test.go index 8321ead4..9970e2ca 100644 --- a/controllers/plan/scaleRevision_test.go +++ b/controllers/plan/scaleRevision_test.go @@ -295,3 +295,75 @@ func TestDontScaleRevision(t *testing.T) { assert.NoError(t, plan.Apply(ctx, m, halfCluster, log), "Shouldn't return error.") } + +// TestScaleRevisionRampingDeletesAutoscalers verifies that when Ramping=true, we delete +// HPA/KEDA/WPA and return early without creating an autoscaler. This allows Picchu to +// control ReplicaSet replicas directly during ramp-up (canarying/releasing). +func TestScaleRevisionRampingDeletesAutoscalers(t *testing.T) { + log := test.MustNewLogger() + ctrl := gomock.NewController(t) + m := mocks.NewMockClient(ctrl) + defer ctrl.Finish() + + var thirty int32 = 30 + plan := &ScaleRevision{ + Tag: "testtag", + Namespace: "testnamespace", + Min: 4, + Max: 10, + CPUTarget: &thirty, + Labels: map[string]string{}, + Ramping: true, + } + ctx := context.TODO() + + // When Ramping=true, we delete HPA, KEDA, WPA and return early. No HPA create/update. + m.EXPECT().Delete(ctx, gomock.Any()).Return(nil).Times(3) + + assert.NoError(t, plan.Apply(ctx, m, halfCluster, log)) +} + +// TestScaleRevisionNotRampingCreatesHPA verifies that when Ramping=false, we create +// the HPA as normal. Released incarnations keep their HPA so it can scale down +// based on traffic as we ramp up the new revision. +func TestScaleRevisionNotRampingCreatesHPA(t *testing.T) { + log := test.MustNewLogger() + ctrl := gomock.NewController(t) + m := mocks.NewMockClient(ctrl) + defer ctrl.Finish() + + var thirty int32 = 30 + plan := &ScaleRevision{ + Tag: "testtag", + Namespace: "testnamespace", + Min: 4, + Max: 10, + CPUTarget: &thirty, + Labels: map[string]string{}, + Ramping: false, + } + ok := client.ObjectKey{Name: "testtag", Namespace: "testnamespace"} + ctx := context.TODO() + + hpa := &autoscaling.HorizontalPodAutoscaler{ + Spec: autoscaling.HorizontalPodAutoscalerSpec{ + MaxReplicas: 0, + }, + } + + expected := mocks.Callback(func(x interface{}) bool { + switch o := x.(type) { + case *autoscaling.HorizontalPodAutoscaler: + return o.Spec.MaxReplicas == 5 && + *o.Spec.Metrics[0].Resource.Target.AverageUtilization == 30 && + len(o.Spec.Metrics) == 1 + default: + return false + } + }, "match expected hpa") + + m.EXPECT().Get(ctx, mocks.ObjectKey(ok), mocks.UpdateHPASpec(hpa)).Return(nil).Times(1) + m.EXPECT().Update(ctx, expected).Return(nil).Times(1) + + assert.NoError(t, plan.Apply(ctx, m, halfCluster, log)) +} diff --git a/controllers/plan/syncRevision.go b/controllers/plan/syncRevision.go index e2f96973..f5baf4b1 100644 --- a/controllers/plan/syncRevision.go +++ b/controllers/plan/syncRevision.go @@ -90,6 +90,7 @@ type SyncRevision struct { TopologySpreadConstraint *corev1.TopologySpreadConstraint ExternalSecrets []es.ExternalSecret EventDriven bool + Ramping bool // When true, Picchu controls ReplicaSet replicas directly (autoscaler disabled) } func (p *SyncRevision) Printable() interface{} { @@ -380,6 +381,9 @@ func (p *SyncRevision) syncReplicaSet( rsAnnotations := map[string]string{ picchuv1alpha1.AnnotationAutoscaler: autoScaler, } + if p.Ramping { + rsAnnotations[picchuv1alpha1.AnnotationRamping] = "true" + } if p.IAMRole != "" { template.Annotations[picchuv1alpha1.AnnotationIAMRole] = p.IAMRole diff --git a/controllers/plan/syncRevision_test.go b/controllers/plan/syncRevision_test.go index 55d1f65c..03425da7 100644 --- a/controllers/plan/syncRevision_test.go +++ b/controllers/plan/syncRevision_test.go @@ -450,6 +450,104 @@ func TestSyncRevisionNoChange(t *testing.T) { common.ResourcesEqual(t, defaultExpectedReplicaSet, &rsl.Items[0]) } +// TestSyncRevisionRampingAnnotation verifies that when Ramping=true, the ReplicaSet +// gets the ramping annotation so plan/common.go allows Picchu to control replicas. +func TestSyncRevisionRampingAnnotation(t *testing.T) { + ctx := context.TODO() + log := test.MustNewLogger() + plan := *defaultRevisionPlan + plan.Ramping = true + cli := fakeClient(defaultServiceAccount) + + rsl := &appsv1.ReplicaSetList{} + assert.NoError(t, plan.Apply(ctx, cli, halfCluster, log)) + assert.NoError(t, cli.List(ctx, rsl)) + assert.Equal(t, 1, len(rsl.Items)) + assert.Equal(t, "true", rsl.Items[0].Annotations[picchuv1alpha1.AnnotationRamping]) +} + +// TestSyncRevisionRampingUpdatesReplicas verifies that when Ramping=true, plan/common.go +// updates the ReplicaSet replicas even when the existing RS has non-zero replicas (e.g. +// from a previous HPA scale-up). This is the critical path for Picchu controlling +// replicas during ramp-up. +func TestSyncRevisionRampingUpdatesReplicas(t *testing.T) { + ctx := context.TODO() + log := test.MustNewLogger() + existing := defaultExpectedReplicaSet.DeepCopy() + var five int32 = 5 + existing.Spec.Replicas = &five + // No ramping annotation - simulates RS that was scaled by HPA before we disabled it + delete(existing.Annotations, picchuv1alpha1.AnnotationRamping) + cli := fakeClient(existing, defaultServiceAccount) + + plan := *defaultRevisionPlan + plan.Ramping = true + plan.Replicas = 3 + // Use cluster with scaling factor 1.0 so desired replicas = 3 + assert.NoError(t, plan.Apply(ctx, cli, cluster, log)) + + rsl := &appsv1.ReplicaSetList{} + assert.NoError(t, cli.List(ctx, rsl)) + assert.Equal(t, 1, len(rsl.Items)) + assert.Equal(t, int32(3), *rsl.Items[0].Spec.Replicas, "Ramping=true should update replicas from 5 to 3") + assert.Equal(t, "true", rsl.Items[0].Annotations[picchuv1alpha1.AnnotationRamping]) +} + +// TestSyncRevisionRampingUpdatesWPAReplicas verifies that when Ramping=true, +// plan/common.go updates ReplicaSet replicas even for WPA targets. Jubilee workers +// use WPA; during ramp we delete the WPA and must control replicas directly. +func TestSyncRevisionRampingUpdatesWPAReplicas(t *testing.T) { + ctx := context.TODO() + log := test.MustNewLogger() + existing := defaultExpectedReplicaSet.DeepCopy() + existing.Annotations[picchuv1alpha1.AnnotationAutoscaler] = picchuv1alpha1.AutoscalerTypeWPA + var two int32 = 2 + existing.Spec.Replicas = &two + delete(existing.Annotations, picchuv1alpha1.AnnotationRamping) + cli := fakeClient(existing, defaultServiceAccount) + + targetMsgs := int32(8) + secsStr := "0.11" + plan := *defaultRevisionPlan + plan.Worker = &picchuv1alpha1.WorkerScaleInfo{ + QueueURI: "https://sqs.us-east-1.amazonaws.com/123/queue", + TargetMessagesPerWorker: &targetMsgs, + SecondsToProcessOneJobString: &secsStr, + } + plan.Ramping = true + plan.Replicas = 6 + assert.NoError(t, plan.Apply(ctx, cli, cluster, log)) + + rsl := &appsv1.ReplicaSetList{} + assert.NoError(t, cli.List(ctx, rsl)) + assert.Equal(t, 1, len(rsl.Items)) + assert.Equal(t, int32(6), *rsl.Items[0].Spec.Replicas, + "Ramping=true should update WPA ReplicaSet replicas from 2 to 6 (WPA is deleted during ramp)") +} + +// TestSyncRevisionNotRampingPreservesReplicas verifies that when Ramping=false, +// plan/common.go does NOT update the ReplicaSet replicas when both existing and +// desired are non-zero. Released incarnations keep their HPA-controlled replica count. +func TestSyncRevisionNotRampingPreservesReplicas(t *testing.T) { + ctx := context.TODO() + log := test.MustNewLogger() + existing := defaultExpectedReplicaSet.DeepCopy() + var five int32 = 5 + existing.Spec.Replicas = &five + delete(existing.Annotations, picchuv1alpha1.AnnotationRamping) + cli := fakeClient(existing, defaultServiceAccount) + + plan := *defaultRevisionPlan + plan.Ramping = false + plan.Replicas = 3 + assert.NoError(t, plan.Apply(ctx, cli, cluster, log)) + + rsl := &appsv1.ReplicaSetList{} + assert.NoError(t, cli.List(ctx, rsl)) + assert.Equal(t, 1, len(rsl.Items)) + assert.Equal(t, int32(5), *rsl.Items[0].Spec.Replicas, "Ramping=false should preserve existing replicas (HPA controls them)") +} + func TestSyncRevisionWithChange(t *testing.T) { ctx := context.TODO() log := test.MustNewLogger() diff --git a/controllers/syncer_test.go b/controllers/syncer_test.go index cfe1b1ba..3f1ca4c2 100644 --- a/controllers/syncer_test.go +++ b/controllers/syncer_test.go @@ -42,9 +42,9 @@ type testClusters struct { func (t testClusters) Apply(i *Incarnation, currentPercent int) { if t.Clusters > 0 { var clusters []ClusterInfo - for i := 0; i < t.Clusters; i++ { + for idx := 0; idx < t.Clusters; idx++ { clusters = append(clusters, ClusterInfo{ - Name: fmt.Sprintf("cluster-%d", i), + Name: fmt.Sprintf("cluster-%d", idx), Live: true, ScalingFactor: 1.0, }) @@ -61,6 +61,17 @@ func (t testClusters) Apply(i *Incarnation, currentPercent int) { } } +// withOtherRevisions populates ReleaseManager.Status.Revisions for getBaseCapacityForRamp tests +type withOtherRevisions struct { + Revisions []picchuv1alpha1.ReleaseManagerRevisionStatus +} + +func (w withOtherRevisions) Apply(i *Incarnation, currentPercent int) { + if ctrl, ok := i.controller.(*IncarnationController); ok && ctrl.releaseManager != nil { + ctrl.releaseManager.Status.Revisions = w.Revisions + } +} + func createTestIncarnation(tag string, currentState State, currentPercent int, options ...testIncarnationOption) *Incarnation { scaleMin := int32(1) incarnation := &Incarnation{ diff --git a/hack/kustom-parse.sh b/hack/kustom-parse.sh index ec37210a..f27f68a0 100755 --- a/hack/kustom-parse.sh +++ b/hack/kustom-parse.sh @@ -2,7 +2,11 @@ mkdir -p resources -which yq || python3 -m pip install yq +# Ensure yq is available (pip install puts it in ~/.local/bin which may not be in PATH) +if ! command -v yq &>/dev/null; then + python3 -m pip install --user yq + export PATH="${HOME}/.local/bin:${PATH}" +fi pushd resources kustomize build ../config/crd | csplit - '/^---$/' {4} diff --git a/plan/common.go b/plan/common.go index 78c9ed54..58c6fdf3 100644 --- a/plan/common.go +++ b/plan/common.go @@ -390,18 +390,22 @@ func CreateOrUpdate( log.Info("Resource is ignored", "namespace", rs.Namespace, "name", rs.Name, "kind", kind) return nil } - if typed.Annotations[picchu.AnnotationAutoscaler] != picchu.AutoscalerTypeWPA { // Allow WorkerPodAutoScaler to manipulate Replicas on its own - // Only update replicas if we are changing to/from zero, which means the replicaset is being retired/deployed + // When ramping, Picchu controls replicas (WPA/HPA deleted). Otherwise, WPA controls its own replicas. + ramping := typed.Annotations[picchu.AnnotationRamping] == "true" + isWPA := typed.Annotations[picchu.AnnotationAutoscaler] == picchu.AutoscalerTypeWPA + if !isWPA || ramping { var zero int32 = 0 if rs.Spec.Replicas == nil { rs.Spec.Replicas = &zero } - if *typed.Spec.Replicas == 0 || *rs.Spec.Replicas == 0 { + if ramping || *typed.Spec.Replicas == 0 || *rs.Spec.Replicas == 0 { *rs.Spec.Replicas = *typed.Spec.Replicas } } // end replicas logic + rs.Annotations = typed.Annotations + if len(rs.Spec.Template.Labels) == 0 { rs.Spec.Template = typed.Spec.Template }