Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ ENVTEST_K8S_VERSION ?= 1.28
all: manager
ci: test
# Run tests
# Uses setup-envtest from controller-runtime (storage.googleapis.com/kubebuilder-tools is deprecated)
ENVTEST_K8S_VERSION ?= 1.28.0
test: generate fmt vet manifests
mkdir -p ${ENVTEST_ASSETS_DIR}
@command -v $(ENVTEST) >/dev/null 2>&1 || GOBIN=$(GOBIN) go install sigs.k8s.io/controller-runtime/tools/setup-envtest@release-0.22
KUBEBUILDER_ASSETS="$$($(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" GOTOOLCHAIN=go1.25.0+auto go test ./... -coverprofile cover.out
go install sigs.k8s.io/controller-runtime/tools/setup-envtest@release-0.22
KUBEBUILDER_ASSETS="$$($(GOBIN)/setup-envtest use -p path $(ENVTEST_K8S_VERSION))" GOTOOLCHAIN=go1.25.0+auto go test ./... -coverprofile cover.out

# Build manager binary
manager: generate fmt vet
Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
AnnotationRepo = "picchu.medium.engineering/repo"
AnnotationCanaryStartedTimestamp = "picchu.medium.engineering/canaryStartedTimestamp"
AnnotationAutoscaler = "picchu.medium.engineering/autoscaler"
AnnotationRamping = "picchu.medium.engineering/ramping"

AutoscalerTypeHPA = "hpa"
AutoscalerTypeWPA = "wpa"
Expand Down
6 changes: 5 additions & 1 deletion api/v1alpha1/source_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ func SetPortDefaults(port *PortInfo) {

func SetScaleDefaults(scale *ScaleInfo) {
if scale.Default == 0 {
scale.Default = defaultScaleDefault
if scale.Min != nil && *scale.Min > 0 {
scale.Default = *scale.Min
} else {
scale.Default = defaultScaleDefault
}
}
if scale.Max == 0 {
scale.Max = defaultScaleMax
Expand Down
70 changes: 70 additions & 0 deletions api/v1alpha1/source_defaults_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package v1alpha1

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSetScaleDefaults(t *testing.T) {
for _, tc := range []struct {
name string
scale ScaleInfo
expected int32
}{
{
name: "Default 0 with Min 16 uses Min",
scale: ScaleInfo{
Min: int32Ptr(16),
Default: 0,
Max: 96,
},
expected: 16,
},
{
name: "Default 0 with Min 1 uses Min",
scale: ScaleInfo{
Min: int32Ptr(1),
Default: 0,
Max: 1,
},
expected: 1,
},
{
name: "Default 0 with Min nil uses 1",
scale: ScaleInfo{
Min: nil,
Default: 0,
Max: 0,
},
expected: 1,
},
{
name: "Default 0 with Min 0 uses 1",
scale: ScaleInfo{
Min: int32Ptr(0),
Default: 0,
Max: 0,
},
expected: 1,
},
{
name: "Default already set is unchanged",
scale: ScaleInfo{
Min: int32Ptr(16),
Default: 48,
Max: 96,
},
expected: 48,
},
} {
t.Run(tc.name, func(t *testing.T) {
SetScaleDefaults(&tc.scale)
assert.Equal(t, tc.expected, tc.scale.Default)
})
}
}

func int32Ptr(v int32) *int32 {
return &v
}
62 changes: 60 additions & 2 deletions controllers/incarnation.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,46 @@ func (i *Incarnation) peakPercent() uint32 {
return i.getStatus().PeakPercent
}

// getBaseCapacityForRamp returns the 100% capacity baseline for replica allocation during ramp.
// Must match CanRampTo's baseCapacity logic so we allocate enough replicas to pass the ramp check.
func (i *Incarnation) getBaseCapacityForRamp() int32 {
target := i.target()
if target == nil || target.Scale.Min == nil {
return 0
}
rm := i.controller.getReleaseManager()
if rm == nil {
return int32(*target.Scale.Min)
}
var totalPods int32 = 0
var totalTrafficPercent uint32 = 0
var hasUnscaledRevision bool = false
for _, rev := range rm.Status.Revisions {
if rev.Tag == i.tag {
continue
}
if rev.CurrentPercent > 0 && rev.Scale.Current > 0 {
totalPods += int32(rev.Scale.Current)
totalTrafficPercent += rev.CurrentPercent
if rev.PeakPercent == 100 && rev.CurrentPercent < 100 {
hasUnscaledRevision = true
}
}
}
if totalPods == 0 || totalTrafficPercent == 0 {
if *target.Scale.Min > 0 {
return int32(*target.Scale.Min)
}
// Scale.Min==0 (e.g. worker that can scale to zero): use Scale.Max as conservative baseline
return target.Scale.Max
}
if hasUnscaledRevision {
return totalPods
}
normalizedCapacity := float64(totalPods) / (float64(totalTrafficPercent) / 100.0)
return int32(math.Ceil(normalizedCapacity))
}

func (i *Incarnation) getExternalTestStatus() ExternalTestStatus {
target := i.target()
if target == nil {
Expand Down Expand Up @@ -376,10 +416,20 @@ func (i *Incarnation) sync(ctx context.Context) error {

var replicas int32
if i.target().Scale.HasAutoscaler() {
replicas = i.divideReplicas(i.target().Scale.Default)
// When ramping, use total capacity (all revisions) so we allocate enough replicas
// for our traffic share. E.g. old=163@15%, new=144@85% → total=307 → new needs ~261.
// Using only other revisions' pods undercounts when we already have pods.
capacity := i.target().Scale.Default
if i.status.State.Current == "canarying" || i.status.State.Current == "releasing" {
// Use only other revisions' pods (baseCapacity). Including our own pods causes
// over-allocation feedback loop (e.g. tutu: 450+383=833 → new gets 752 → total 820).
if base := i.getBaseCapacityForRamp(); base > 0 {
capacity = base
}
}
replicas = i.divideReplicas(capacity)
} else {
replicas = i.divideReplicasNoAutoscale(i.target().Scale.Default)

}
syncPlan := &rmplan.SyncRevision{
App: i.appName(),
Expand Down Expand Up @@ -411,6 +461,9 @@ func (i *Incarnation) sync(ctx context.Context) error {
EventDriven: i.isEventDriven(),
TopologySpreadConstraint: i.target().TopologySpreadConstraint,
PodDisruptionBudget: i.target().PodDisruptionBudget,
// Only disable autoscaler when ramping UP (canarying/releasing). Keep HPA for released
// incarnations so they scale down based on traffic as we ramp the new revision.
Ramping: i.target().Scale.HasAutoscaler() && (i.status.State.Current == "canarying" || i.status.State.Current == "releasing"),
}

if !i.isRoutable() {
Expand Down Expand Up @@ -509,6 +562,10 @@ func (i *Incarnation) genScalePlan(ctx context.Context) *rmplan.ScaleRevision {
min = max
} else if (i.target().Scale.Worker != nil || i.target().Scale.KedaWorker != nil) && *i.target().Scale.Min == 0 {
min = 0
} else if i.target().Scale.HasAutoscaler() && i.status.State.Current == "released" {
// Give HPA full Scale.Min to Scale.Max; it downscales based on observed load.
min = i.controller.divideReplicas(*i.target().Scale.Min, 100)
max = i.controller.divideReplicas(i.target().Scale.Max, 100)
}

return &rmplan.ScaleRevision{
Expand All @@ -525,6 +582,7 @@ func (i *Incarnation) genScalePlan(ctx context.Context) *rmplan.ScaleRevision {
KedaWorker: i.target().Scale.KedaWorker,
EventDriven: i.isEventDriven(),
ServiceAccountName: i.appName(),
Ramping: i.target().Scale.HasAutoscaler() && (i.status.State.Current == "canarying" || i.status.State.Current == "releasing"),
}
}

Expand Down
116 changes: 116 additions & 0 deletions controllers/incarnation_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers

import (
"context"
ttesting "testing"
"time"

Expand Down Expand Up @@ -279,6 +280,121 @@ func TestIncarnation_divideReplicasNoAutoscale(t *ttesting.T) {
}
}

// TestGetBaseCapacityForRamp verifies getBaseCapacityForRamp returns the correct capacity
// for replica allocation during ramp, matching CanRampTo's logic.
func TestGetBaseCapacityForRamp(t *ttesting.T) {
scaleMin := int32(16)
scaleMinZero := int32(0)
for _, tc := range []struct {
name string
opts []testIncarnationOption
scaleMin *int32
scaleMax int32
expected int32
}{
{
name: "no other revisions uses Scale.Min",
opts: []testIncarnationOption{&testClusters{Clusters: 4}},
scaleMin: &scaleMin,
scaleMax: 96,
expected: 16,
},
{
name: "old revision at 100% with 120 pods (hasUnscaledRevision) uses totalPods",
opts: []testIncarnationOption{&testClusters{Clusters: 4}, withOtherRevisions{Revisions: []picchu.ReleaseManagerRevisionStatus{{Tag: "old-rev", CurrentPercent: 60, PeakPercent: 100, Scale: picchu.ReleaseManagerRevisionScaleStatus{Current: 120}}}}},
scaleMin: &scaleMin,
scaleMax: 96,
expected: 120,
},
{
name: "old revision scaled down uses normalized capacity",
opts: []testIncarnationOption{&testClusters{Clusters: 4}, withOtherRevisions{Revisions: []picchu.ReleaseManagerRevisionStatus{{Tag: "old-rev", CurrentPercent: 60, PeakPercent: 60, Scale: picchu.ReleaseManagerRevisionScaleStatus{Current: 60}}}}},
scaleMin: &scaleMin,
scaleMax: 96,
expected: 100, // 60 / 0.6 = 100
},
{
name: "Scale.Min=0 with no other revisions uses Scale.Max (e.g. jubilee worker)",
opts: []testIncarnationOption{&testClusters{Clusters: 4}},
scaleMin: &scaleMinZero,
scaleMax: 60,
expected: 60, // Scale.Max when Scale.Min=0
},
{
name: "excludes our own pods to avoid over-allocation feedback loop",
opts: []testIncarnationOption{&testClusters{Clusters: 4}, withOtherRevisions{Revisions: []picchu.ReleaseManagerRevisionStatus{{Tag: "old-rev", CurrentPercent: 15, PeakPercent: 100, Scale: picchu.ReleaseManagerRevisionScaleStatus{Current: 163}}}}},
scaleMin: &scaleMin,
scaleMax: 320,
expected: 163, // other revisions only; our 144 pods must NOT be included
},
} {
t.Run(tc.name, func(t *ttesting.T) {
i := createTestIncarnation("new-rev", releasing, 1, tc.opts...)
i.revision.Spec.Targets[0].Scale.Min = tc.scaleMin
i.revision.Spec.Targets[0].Scale.Max = tc.scaleMax
if tc.name == "excludes our own pods to avoid over-allocation feedback loop" {
i.status.Scale.Current = 144
}
assert.Equal(t, tc.expected, i.getBaseCapacityForRamp())
})
}
}

// TestGenScalePlanRampingByState verifies that Ramping is only true for canarying/releasing
// states. Released incarnations keep their HPA so it can scale down based on traffic.
func TestGenScalePlanRampingByState(t *ttesting.T) {
cpuTarget := int32(70)
scaleMax := int32(10)
for _, tc := range []struct {
name string
state State
expectRamp bool
hasAutoscale bool
}{
{"canarying ramps", canarying, true, true},
{"releasing ramps", releasing, true, true},
{"released does not ramp", released, false, true},
{"deploying does not ramp", deploying, false, true},
{"deployed does not ramp", deployed, false, true},
{"pendingrelease does not ramp", pendingrelease, false, true},
{"no autoscaler never ramps", released, false, false},
} {
t.Run(tc.name, func(t *ttesting.T) {
i := createTestIncarnation("test", tc.state, 50)
if tc.hasAutoscale {
i.revision.Spec.Targets[0].Scale.TargetCPUUtilizationPercentage = &cpuTarget
i.revision.Spec.Targets[0].Scale.Max = scaleMax
}
plan := i.genScalePlan(context.Background())
assert.Equal(t, tc.expectRamp, plan.Ramping, "state=%s", tc.state)
})
}
}

// TestGenScalePlanReleasedKeepsHPA verifies that released revisions keep normal HPA
// min/max (Scale.Min, Scale.Max). HPA downscales based on observed load, not preemptive pinning.
func TestGenScalePlanReleasedKeepsHPA(t *ttesting.T) {
scaleMin := int32(16)
scaleMax := int32(320)
i := createTestIncarnation("old-rev", released, 15, &testClusters{Clusters: 4},
withOtherRevisions{
Revisions: []picchu.ReleaseManagerRevisionStatus{
{Tag: "new-rev", CurrentPercent: 85, PeakPercent: 85, Scale: picchu.ReleaseManagerRevisionScaleStatus{Current: 144}},
},
})
i.revision.Spec.Targets[0].Scale.Min = &scaleMin
i.revision.Spec.Targets[0].Scale.Max = scaleMax
i.revision.Spec.Targets[0].Scale.TargetCPUUtilizationPercentage = ptr(int32(70))

plan := i.genScalePlan(context.Background())

// Released gets full Scale.Min to Scale.Max; HPA downscales naturally based on CPU load.
assert.Equal(t, i.controller.divideReplicas(16, 100), plan.Min, "released keeps full Scale.Min")
assert.Equal(t, i.controller.divideReplicas(320, 100), plan.Max, "released keeps full Scale.Max")
}

func ptr[T any](v T) *T { return &v }

func Test_IsExpired(t *ttesting.T) {
for _, test := range []struct {
Name string
Expand Down
22 changes: 22 additions & 0 deletions controllers/plan/scaleRevision.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,31 @@ type ScaleRevision struct {
KedaWorker *picchuv1alpha1.KedaScaleInfo
EventDriven bool
ServiceAccountName string
Ramping bool // When true, delete autoscaler so Picchu can control ReplicaSet replicas directly
}

func (p *ScaleRevision) Apply(ctx context.Context, cli client.Client, cluster *picchuv1alpha1.Cluster, log logr.Logger) error {
if p.Ramping {
// Delete autoscalers so Picchu can control ReplicaSet replicas directly during ramp
objects := []client.Object{
&autoscaling.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{Name: p.Tag, Namespace: p.Namespace},
},
&kedav1.ScaledObject{
ObjectMeta: metav1.ObjectMeta{Name: p.Tag, Namespace: p.Namespace},
},
&wpav1.WorkerPodAutoScaler{
ObjectMeta: metav1.ObjectMeta{Name: p.Tag, Namespace: p.Namespace},
},
}
for _, obj := range objects {
if err := utils.DeleteIfExists(ctx, cli, obj); err != nil {
return err
}
}
return nil
}

if p.Min > p.Max {
p.Max = p.Min
}
Expand Down
Loading