Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -131,6 +133,20 @@ func (c *consolidation) sortCandidates(candidates []*Candidate) []*Candidate {
//
// nolint:gocyclo
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, error) {
// Build NodePool summary
nodePoolCounts := make(map[string]int)
for _, cn := range candidates {
nodePoolCounts[cn.NodePool.Name]++
}
nodePoolSummary := lo.MapToSlice(nodePoolCounts, func(np string, count int) string {
return fmt.Sprintf("%s:%d", np, count)
})
candidateNames := lo.Map(candidates, func(cn *Candidate, _ int) string { return cn.Name() })
log.FromContext(ctx).Info("computeConsolidation started",
"candidateCount", len(candidates),
"candidateNodePools", nodePoolSummary,
"candidates", candidateNames)

var err error
// Run scheduling simulation to compute consolidation option
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, candidates...)
Expand All @@ -144,15 +160,23 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...

// if not all of the pods were scheduled, we can't do anything
if !results.AllNonPendingPodsScheduled() {
log.FromContext(ctx).Info("computeConsolidation: not all pods scheduled",
"podErrors", len(results.PodErrors),
"errorSummary", results.NonPendingPodSchedulingErrors())
// This method is used by multi-node consolidation as well, so we'll only report in the single node case
if len(candidates) == 1 {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: SingleNodeConsolidationType, metrics.ReasonLabel: "PodsNotSchedulable"})
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, pretty.Sentence(results.NonPendingPodSchedulingErrors()))...)
} else {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: MultiNodeConsolidationType, metrics.ReasonLabel: "PodsNotSchedulable"})
}
return Command{}, nil
}

// were we able to schedule all the pods on the inflight candidates?
if len(results.NewNodeClaims) == 0 {
log.FromContext(ctx).Info("computeConsolidation: DELETE decision (no new nodes needed)",
"candidateCount", len(candidates))
return Command{
Candidates: candidates,
Results: results,
Expand All @@ -161,8 +185,13 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...

// we're not going to turn a single node into multiple candidates
if len(results.NewNodeClaims) != 1 {
log.FromContext(ctx).Info("computeConsolidation: cannot consolidate (would create multiple nodes)",
"newNodeClaimsNeeded", len(results.NewNodeClaims))
if len(candidates) == 1 {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: SingleNodeConsolidationType, metrics.ReasonLabel: "MultipleNodesNeeded"})
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Can't remove without creating %d candidates", len(results.NewNodeClaims)))...)
} else {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: MultiNodeConsolidationType, metrics.ReasonLabel: "MultipleNodesNeeded"})
}
return Command{}, nil
}
Expand Down Expand Up @@ -202,8 +231,13 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
return Command{}, nil
}
if len(results.NewNodeClaims[0].InstanceTypeOptions) == 0 {
log.FromContext(ctx).Info("computeConsolidation: no cheaper replacement found",
"candidatePrice", candidatePrice)
if len(candidates) == 1 {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: SingleNodeConsolidationType, metrics.ReasonLabel: "NoCheaperReplacement"})
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
} else {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: MultiNodeConsolidationType, metrics.ReasonLabel: "NoCheaperReplacement"})
}
return Command{}, nil
}
Expand All @@ -218,6 +252,17 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
results.NewNodeClaims[0].Requirements.Add(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
}

// Log the REPLACE decision
replacementOptions := lo.Map(results.NewNodeClaims[0].InstanceTypeOptions, func(it *cloudprovider.InstanceType, _ int) string { return it.Name })
if len(replacementOptions) > 5 {
replacementOptions = replacementOptions[:5]
}
log.FromContext(ctx).Info("computeConsolidation: REPLACE decision",
"candidateCount", len(candidates),
"candidatePrice", candidatePrice,
"replacementOptions", replacementOptions,
"totalReplacementOptions", len(results.NewNodeClaims[0].InstanceTypeOptions))

return Command{
Candidates: candidates,
Replacements: replacementsFromNodeClaims(results.NewNodeClaims...),
Expand All @@ -235,7 +280,10 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
// Spot consolidation is turned off.
if !options.FromContext(ctx).FeatureGates.SpotToSpotConsolidation {
if len(candidates) == 1 {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: SingleNodeConsolidationType, metrics.ReasonLabel: "SpotToSpotConsolidationDisabled"})
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "SpotToSpotConsolidation is disabled, can't replace a spot node with a spot node")...)
} else {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: MultiNodeConsolidationType, metrics.ReasonLabel: "SpotToSpotConsolidationDisabled"})
}
return Command{}, nil
}
Expand All @@ -256,7 +304,10 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
}
if len(results.NewNodeClaims[0].InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: SingleNodeConsolidationType, metrics.ReasonLabel: "NoCheaperReplacement"})
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
} else {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: MultiNodeConsolidationType, metrics.ReasonLabel: "NoCheaperReplacement"})
}
return Command{}, nil
}
Expand All @@ -277,6 +328,7 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
// 1) The current candidate is not in the set of the 15 cheapest instance types and
// 2) There were at least 15 options cheaper than the current candidate.
if len(results.NewNodeClaims[0].InstanceTypeOptions) < MinInstanceTypesForSpotToSpotConsolidation {
ConsolidationSkippedTotal.Inc(map[string]string{ConsolidationTypeLabel: SingleNodeConsolidationType, metrics.ReasonLabel: "InsufficientSpotTypesForSpotToSpot"})
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("SpotToSpotConsolidation requires %d cheaper instance type options than the current candidate to consolidate, got %d",
MinInstanceTypesForSpotToSpotConsolidation, len(results.NewNodeClaims[0].InstanceTypeOptions)))...)
return Command{}, nil
Expand Down
50 changes: 50 additions & 0 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
return !candidateNames.Has(n.Name())
})

// Build NodePool summary for candidates
nodePoolCounts := make(map[string]int)
for _, c := range candidates {
nodePoolCounts[c.NodePool.Name]++
}
nodePoolSummary := lo.MapToSlice(nodePoolCounts, func(np string, count int) string {
return fmt.Sprintf("%s:%d", np, count)
})

// Verbose logging for simulation debugging
log.FromContext(ctx).Info("SimulateScheduling started",
"candidateCount", len(candidates),
"candidateNodePools", nodePoolSummary,
"candidateNames", candidateNames.UnsortedList(),
"totalNodes", len(nodes),
"activeNodes", len(stateNodes),
"deletingNodes", len(deletingNodes))

// We do one final check to ensure that the node that we are attempting to consolidate isn't
// already handled for deletion by some other controller. This could happen if the node was markedForDeletion
// between returning the candidates and getting the stateNodes above
Expand All @@ -73,6 +91,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
if err != nil {
return scheduling.Results{}, fmt.Errorf("determining pending pods, %w", err)
}
log.FromContext(ctx).Info("GetPendingPods completed", "pendingPodCount", len(pods))

// Don't provision capacity for pods which will not get evicted due to fully blocking PDBs.
// Since Karpenter doesn't know when these pods will be successfully evicted, spinning up capacity until
Expand All @@ -85,6 +104,11 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
currentlyReschedulablePods := lo.Filter(n.reschedulablePods, func(p *corev1.Pod, _ int) bool {
return pdbs.IsCurrentlyReschedulable(p)
})
log.FromContext(ctx).Info("Candidate pods to reschedule",
"nodePool", n.NodePool.Name,
"candidate", n.Name(),
"totalPods", len(n.reschedulablePods),
"reschedulablePods", len(currentlyReschedulablePods))
pods = append(pods, currentlyReschedulablePods...)
}

Expand Down Expand Up @@ -114,11 +138,37 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
return client.ObjectKeyFromObject(p), nil
})

log.FromContext(ctx).Info("Running scheduler.Solve",
"totalPodsToSchedule", len(pods),
"availableNodes", len(stateNodes))

results, err := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods)
if err != nil {
return scheduling.Results{}, fmt.Errorf("scheduling pods, %w", err)
}
results = results.TruncateInstanceTypes(ctx, scheduling.MaxInstanceTypes)

log.FromContext(ctx).Info("Scheduler.Solve completed",
"newNodeClaimsNeeded", len(results.NewNodeClaims),
"existingNodesUsed", len(results.ExistingNodes),
"podErrors", len(results.PodErrors))

// Log detailed info about each new NodeClaim and which pods require it
for i, nc := range results.NewNodeClaims {
podNames := lo.Map(nc.Pods, func(p *corev1.Pod, _ int) string {
return fmt.Sprintf("%s/%s", p.Namespace, p.Name)
})
instanceTypeNames := lo.Map(nc.InstanceTypeOptions[:min(5, len(nc.InstanceTypeOptions))], func(it *cloudprovider.InstanceType, _ int) string {
return it.Name
})
log.FromContext(ctx).Info("NewNodeClaim details",
"index", i,
"candidateNodePools", nodePoolSummary,
"podCount", len(nc.Pods),
"pods", podNames,
"topInstanceTypes", instanceTypeNames,
"totalInstanceTypeOptions", len(nc.InstanceTypeOptions))
}
for _, n := range results.ExistingNodes {
// We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should
// tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them
Expand Down
20 changes: 20 additions & 0 deletions pkg/controllers/disruption/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ var (
},
[]string{decisionLabel, metrics.ReasonLabel, ConsolidationTypeLabel},
)
ConsolidationSkippedTotal = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Name: "consolidation_skipped_total",
Help: "Number of times consolidation was skipped. Labeled by reason and consolidation type.",
},
[]string{metrics.ReasonLabel, ConsolidationTypeLabel},
)
EligibleNodes = opmetrics.NewPrometheusGauge(
crmetrics.Registry,
prometheus.GaugeOpts{
Expand Down Expand Up @@ -88,6 +98,16 @@ var (
},
[]string{ConsolidationTypeLabel},
)
FailedValidationsCommandTotal = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Name: "failed_validations_command_total",
Help: "Number of command validations that failed. Labeled by consolidation type.",
},
[]string{ConsolidationTypeLabel},
)
NodePoolAllowedDisruptions = opmetrics.NewPrometheusGauge(
crmetrics.Registry,
prometheus.GaugeOpts{
Expand Down
58 changes: 55 additions & 3 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ func NewMultiNodeConsolidation(c consolidation, opts ...option.Function[MethodOp
}

func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, error) {
log.FromContext(ctx).Info("MultiNodeConsolidation.ComputeCommand started",
"totalCandidates", len(candidates),
"budgetMapping", disruptionBudgetMapping)

if m.IsConsolidated() {
log.FromContext(ctx).Info("MultiNodeConsolidation: already consolidated, skipping")
return Command{}, nil
}
candidates = m.sortCandidates(candidates)
Expand Down Expand Up @@ -85,6 +90,28 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
// This could be further configurable in the future.
maxParallel := lo.Clamp(len(disruptableCandidates), 0, 100)

log.FromContext(ctx).Info("MultiNodeConsolidation: filtered candidates",
"disruptableCandidates", len(disruptableCandidates),
"constrainedByBudgets", constrainedByBudgets,
"maxParallel", maxParallel)

// Log candidate details
for i, c := range disruptableCandidates {
if i < 20 { // Only log first 20 to avoid spam
log.FromContext(ctx).Info("MultiNodeConsolidation candidate",
"nodePool", c.NodePool.Name,
"index", i,
"node", c.Name(),
"instanceType", c.instanceType.Name,
"pods", len(c.reschedulablePods),
"disruptionCost", c.DisruptionCost)
}
}
if len(disruptableCandidates) > 20 {
log.FromContext(ctx).Info("MultiNodeConsolidation: ... and more candidates",
"remaining", len(disruptableCandidates)-20)
}

cmd, err := m.firstNConsolidationOption(ctx, disruptableCandidates, maxParallel)
if err != nil {
return Command{}, err
Expand All @@ -102,7 +129,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB

if cmd, err = m.validator.Validate(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
log.FromContext(ctx).V(1).WithValues(cmd.LogValues()...).Info("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid")
log.FromContext(ctx).WithValues(cmd.LogValues()...).Info("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid")
return Command{}, nil
}
return Command{}, fmt.Errorf("validating consolidation, %w", err)
Expand Down Expand Up @@ -131,17 +158,31 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
mid := (min + max) / 2
candidatesToConsolidate := candidates[0 : mid+1]

// Build NodePool summary for this batch
nodePoolCounts := make(map[string]int)
for _, c := range candidatesToConsolidate {
nodePoolCounts[c.NodePool.Name]++
}
nodePoolSummary := lo.MapToSlice(nodePoolCounts, func(np string, count int) string {
return fmt.Sprintf("%s:%d", np, count)
})

log.FromContext(ctx).Info("MultiNodeConsolidation: binary search iteration",
"min", min, "max", max, "mid", mid,
"candidatesToConsolidate", len(candidatesToConsolidate),
"nodePoolsInBatch", nodePoolSummary)

// Pass the timeout context to ensure sub-operations can be canceled
cmd, err := m.computeConsolidation(timeoutCtx, candidatesToConsolidate...)
// context deadline exceeded will return to the top of the loop and either return nothing or the last saved command
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
ConsolidationTimeoutsTotal.Inc(map[string]string{ConsolidationTypeLabel: m.ConsolidationType()})
if lastSavedCommand.Candidates == nil {
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
log.FromContext(ctx).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
return Command{}, nil
}
log.FromContext(ctx).V(1).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
log.FromContext(ctx).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
return lastSavedCommand, nil

}
Expand All @@ -159,12 +200,23 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
}
if validDecision {
// We can consolidate NodeClaims [0,mid]
log.FromContext(ctx).Info("MultiNodeConsolidation: valid consolidation found",
"decision", cmd.Decision(),
"candidateCount", len(candidatesToConsolidate),
"nodePoolsInBatch", nodePoolSummary,
"replacements", len(cmd.Replacements))
lastSavedCommand = cmd
min = mid + 1
} else {
log.FromContext(ctx).Info("MultiNodeConsolidation: consolidation not valid, reducing batch",
"decision", cmd.Decision(),
"nodePoolsInBatch", nodePoolSummary)
max = mid - 1
}
}
log.FromContext(ctx).Info("MultiNodeConsolidation: binary search complete",
"foundCommand", lastSavedCommand.Candidates != nil,
"candidatesInCommand", len(lastSavedCommand.Candidates))
return lastSavedCommand, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (q *Queue) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reconci
// Log the error
log.FromContext(ctx).Error(multiErr, "failed terminating nodes while executing a disruption command")
} else {
log.FromContext(ctx).V(1).Info("command succeeded")
log.FromContext(ctx).Info("command succeeded")
cmd.Succeeded = true
}
q.CompleteCommand(cmd)
Expand Down
Loading