Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/karpenter/pkg/utils/pretty"

Expand Down Expand Up @@ -131,6 +132,11 @@ func (c *consolidation) sortCandidates(candidates []*Candidate) []*Candidate {
//
// nolint:gocyclo
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, error) {
candidateNames := lo.Map(candidates, func(cn *Candidate, _ int) string { return cn.Name() })
log.FromContext(ctx).Info("computeConsolidation started",
"candidateCount", len(candidates),
"candidates", candidateNames)

var err error
// Run scheduling simulation to compute consolidation option
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, candidates...)
Expand All @@ -144,6 +150,9 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...

// if not all of the pods were scheduled, we can't do anything
if !results.AllNonPendingPodsScheduled() {
log.FromContext(ctx).Info("computeConsolidation: not all pods scheduled",
"podErrors", len(results.PodErrors),
"errorSummary", results.NonPendingPodSchedulingErrors())
// This method is used by multi-node consolidation as well, so we'll only report in the single node case
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, pretty.Sentence(results.NonPendingPodSchedulingErrors()))...)
Expand All @@ -153,6 +162,8 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...

// were we able to schedule all the pods on the inflight candidates?
if len(results.NewNodeClaims) == 0 {
log.FromContext(ctx).Info("computeConsolidation: DELETE decision (no new nodes needed)",
"candidateCount", len(candidates))
return Command{
Candidates: candidates,
Results: results,
Expand All @@ -161,6 +172,8 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...

// we're not going to turn a single node into multiple candidates
if len(results.NewNodeClaims) != 1 {
log.FromContext(ctx).Info("computeConsolidation: cannot consolidate (would create multiple nodes)",
"newNodeClaimsNeeded", len(results.NewNodeClaims))
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Can't remove without creating %d candidates", len(results.NewNodeClaims)))...)
}
Expand Down Expand Up @@ -202,6 +215,8 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
return Command{}, nil
}
if len(results.NewNodeClaims[0].InstanceTypeOptions) == 0 {
log.FromContext(ctx).Info("computeConsolidation: no cheaper replacement found",
"candidatePrice", candidatePrice)
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
}
Expand All @@ -218,6 +233,17 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
results.NewNodeClaims[0].Requirements.Add(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
}

// Log the REPLACE decision
replacementOptions := lo.Map(results.NewNodeClaims[0].InstanceTypeOptions, func(it *cloudprovider.InstanceType, _ int) string { return it.Name })
if len(replacementOptions) > 5 {
replacementOptions = replacementOptions[:5]
}
log.FromContext(ctx).Info("computeConsolidation: REPLACE decision",
"candidateCount", len(candidates),
"candidatePrice", candidatePrice,
"replacementOptions", replacementOptions,
"totalReplacementOptions", len(results.NewNodeClaims[0].InstanceTypeOptions))

return Command{
Candidates: candidates,
Replacements: replacementsFromNodeClaims(results.NewNodeClaims...),
Expand Down
39 changes: 39 additions & 0 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
return !candidateNames.Has(n.Name())
})

// Verbose logging for simulation debugging
log.FromContext(ctx).Info("SimulateScheduling started",
"candidateCount", len(candidates),
"candidateNames", candidateNames.UnsortedList(),
"totalNodes", len(nodes),
"activeNodes", len(stateNodes),
"deletingNodes", len(deletingNodes))

// We do one final check to ensure that the node that we are attempting to consolidate isn't
// already handled for deletion by some other controller. This could happen if the node was markedForDeletion
// between returning the candidates and getting the stateNodes above
Expand All @@ -73,6 +81,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
if err != nil {
return scheduling.Results{}, fmt.Errorf("determining pending pods, %w", err)
}
log.FromContext(ctx).Info("GetPendingPods completed", "pendingPodCount", len(pods))

// Don't provision capacity for pods which will not get evicted due to fully blocking PDBs.
// Since Karpenter doesn't know when these pods will be successfully evicted, spinning up capacity until
Expand All @@ -85,6 +94,11 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
currentlyReschedulablePods := lo.Filter(n.reschedulablePods, func(p *corev1.Pod, _ int) bool {
return pdbs.IsCurrentlyReschedulable(p)
})
log.FromContext(ctx).Info("Candidate pods to reschedule",
"nodePool", n.NodePool.Name,
"candidate", n.Name(),
"totalPods", len(n.reschedulablePods),
"reschedulablePods", len(currentlyReschedulablePods))
pods = append(pods, currentlyReschedulablePods...)
}

Expand Down Expand Up @@ -114,11 +128,36 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
return client.ObjectKeyFromObject(p), nil
})

log.FromContext(ctx).Info("Running scheduler.Solve",
"totalPodsToSchedule", len(pods),
"availableNodes", len(stateNodes))

results, err := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods)
if err != nil {
return scheduling.Results{}, fmt.Errorf("scheduling pods, %w", err)
}
results = results.TruncateInstanceTypes(ctx, scheduling.MaxInstanceTypes)

log.FromContext(ctx).Info("Scheduler.Solve completed",
"newNodeClaimsNeeded", len(results.NewNodeClaims),
"existingNodesUsed", len(results.ExistingNodes),
"podErrors", len(results.PodErrors))

// Log detailed info about each new NodeClaim and which pods require it
for i, nc := range results.NewNodeClaims {
podNames := lo.Map(nc.Pods, func(p *corev1.Pod, _ int) string {
return fmt.Sprintf("%s/%s", p.Namespace, p.Name)
})
instanceTypeNames := lo.Map(nc.InstanceTypeOptions[:min(5, len(nc.InstanceTypeOptions))], func(it *cloudprovider.InstanceType, _ int) string {
return it.Name
})
log.FromContext(ctx).Info("NewNodeClaim details",
"index", i,
"podCount", len(nc.Pods),
"pods", podNames,
"topInstanceTypes", instanceTypeNames,
"totalInstanceTypeOptions", len(nc.InstanceTypeOptions))
}
for _, n := range results.ExistingNodes {
// We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should
// tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them
Expand Down
46 changes: 43 additions & 3 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ func NewMultiNodeConsolidation(c consolidation, opts ...option.Function[MethodOp
}

func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, error) {
log.FromContext(ctx).Info("MultiNodeConsolidation.ComputeCommand started",
"totalCandidates", len(candidates),
"budgetMapping", disruptionBudgetMapping)

if m.IsConsolidated() {
log.FromContext(ctx).Info("MultiNodeConsolidation: already consolidated, skipping")
return Command{}, nil
}
candidates = m.sortCandidates(candidates)
Expand Down Expand Up @@ -85,6 +90,28 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
// This could be further configurable in the future.
maxParallel := lo.Clamp(len(disruptableCandidates), 0, 100)

log.FromContext(ctx).Info("MultiNodeConsolidation: filtered candidates",
"disruptableCandidates", len(disruptableCandidates),
"constrainedByBudgets", constrainedByBudgets,
"maxParallel", maxParallel)

// Log candidate details
for i, c := range disruptableCandidates {
if i < 20 { // Only log first 20 to avoid spam
log.FromContext(ctx).Info("MultiNodeConsolidation candidate",
"nodePool", c.NodePool.Name,
"index", i,
"node", c.Name(),
"instanceType", c.instanceType.Name,
"pods", len(c.reschedulablePods),
"disruptionCost", c.DisruptionCost)
}
}
if len(disruptableCandidates) > 20 {
log.FromContext(ctx).Info("MultiNodeConsolidation: ... and more candidates",
"remaining", len(disruptableCandidates)-20)
}

cmd, err := m.firstNConsolidationOption(ctx, disruptableCandidates, maxParallel)
if err != nil {
return Command{}, err
Expand All @@ -102,7 +129,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB

if cmd, err = m.validator.Validate(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
log.FromContext(ctx).V(1).WithValues(cmd.LogValues()...).Info("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid")
log.FromContext(ctx).WithValues(cmd.LogValues()...).Info("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid")
return Command{}, nil
}
return Command{}, fmt.Errorf("validating consolidation, %w", err)
Expand Down Expand Up @@ -131,17 +158,21 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
mid := (min + max) / 2
candidatesToConsolidate := candidates[0 : mid+1]

log.FromContext(ctx).Info("MultiNodeConsolidation: binary search iteration",
"min", min, "max", max, "mid", mid,
"candidatesToConsolidate", len(candidatesToConsolidate))

// Pass the timeout context to ensure sub-operations can be canceled
cmd, err := m.computeConsolidation(timeoutCtx, candidatesToConsolidate...)
// context deadline exceeded will return to the top of the loop and either return nothing or the last saved command
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
ConsolidationTimeoutsTotal.Inc(map[string]string{ConsolidationTypeLabel: m.ConsolidationType()})
if lastSavedCommand.Candidates == nil {
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
log.FromContext(ctx).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
return Command{}, nil
}
log.FromContext(ctx).V(1).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
log.FromContext(ctx).WithValues(lastSavedCommand.LogValues()...).Info("stopping multi-node consolidation after timeout, returning last valid command")
return lastSavedCommand, nil

}
Expand All @@ -159,12 +190,21 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
}
if validDecision {
// We can consolidate NodeClaims [0,mid]
log.FromContext(ctx).Info("MultiNodeConsolidation: valid consolidation found",
"decision", cmd.Decision(),
"candidateCount", len(candidatesToConsolidate),
"replacements", len(cmd.Replacements))
lastSavedCommand = cmd
min = mid + 1
} else {
log.FromContext(ctx).Info("MultiNodeConsolidation: consolidation not valid, reducing batch",
"decision", cmd.Decision())
max = mid - 1
}
}
log.FromContext(ctx).Info("MultiNodeConsolidation: binary search complete",
"foundCommand", lastSavedCommand.Candidates != nil,
"candidatesInCommand", len(lastSavedCommand.Candidates))
return lastSavedCommand, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (q *Queue) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reconci
// Log the error
log.FromContext(ctx).Error(multiErr, "failed terminating nodes while executing a disruption command")
} else {
log.FromContext(ctx).V(1).Info("command succeeded")
log.FromContext(ctx).Info("command succeeded")
cmd.Succeeded = true
}
q.CompleteCommand(cmd)
Expand Down
31 changes: 28 additions & 3 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ func NewSingleNodeConsolidation(c consolidation, opts ...option.Function[MethodO
// ComputeCommand generates a disruption command given candidates
// nolint:gocyclo
func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, error) {
log.FromContext(ctx).Info("SingleNodeConsolidation.ComputeCommand started",
"totalCandidates", len(candidates),
"budgetMapping", disruptionBudgetMapping)

if s.IsConsolidated() {
log.FromContext(ctx).Info("SingleNodeConsolidation: already consolidated, skipping")
return Command{}, nil
}
candidates = s.SortCandidates(ctx, candidates)
Expand All @@ -68,7 +73,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
for i, candidate := range candidates {
if s.clock.Now().After(timeout) {
ConsolidationTimeoutsTotal.Inc(map[string]string{ConsolidationTypeLabel: s.ConsolidationType()})
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))
log.FromContext(ctx).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))

s.PreviouslyUnseenNodePools = unseenNodePools

Expand All @@ -88,21 +93,41 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
// assume that it was due to budgets. If we don't filter out budgets, users who set a budget for `empty`
// can find their nodes disrupted here.
if len(candidate.reschedulablePods) == 0 {
log.FromContext(ctx).Info("SingleNodeConsolidation: skipping empty candidate",
"nodePool", candidate.NodePool.Name,
"candidate", candidate.Name())
continue
}

log.FromContext(ctx).Info("SingleNodeConsolidation: evaluating candidate",
"nodePool", candidate.NodePool.Name,
"index", i,
"candidate", candidate.Name(),
"instanceType", candidate.instanceType.Name,
"pods", len(candidate.reschedulablePods),
"disruptionCost", candidate.DisruptionCost)

// compute a possible consolidation option
cmd, err := s.computeConsolidation(ctx, candidate)
if err != nil {
log.FromContext(ctx).Error(err, "failed computing consolidation")
continue
}
if cmd.Decision() == NoOpDecision {
log.FromContext(ctx).Info("SingleNodeConsolidation: NoOp decision for candidate",
"nodePool", candidate.NodePool.Name,
"candidate", candidate.Name())
continue
}

log.FromContext(ctx).Info("SingleNodeConsolidation: found consolidation option",
"nodePool", candidate.NodePool.Name,
"candidate", candidate.Name(),
"decision", cmd.Decision(),
"replacements", len(cmd.Replacements))
if _, err = s.validator.Validate(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
log.FromContext(ctx).V(1).WithValues(cmd.LogValues()...).Info("abandoning single-node consolidation attempt due to pod churn, command is no longer valid")
log.FromContext(ctx).WithValues(cmd.LogValues()...).Info("abandoning single-node consolidation attempt due to pod churn, command is no longer valid")
return Command{}, nil
}
return Command{}, fmt.Errorf("validating consolidation, %w", err)
Expand Down Expand Up @@ -150,7 +175,7 @@ func (s *SingleNodeConsolidation) shuffleCandidates(ctx context.Context, nodePoo
var result []*Candidate
// Log any timed out nodepools that we're prioritizing
if s.PreviouslyUnseenNodePools.Len() != 0 {
log.FromContext(ctx).V(1).Info(fmt.Sprintf("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %s", strings.Join(s.PreviouslyUnseenNodePools.UnsortedList(), ", ")))
log.FromContext(ctx).Info(fmt.Sprintf("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %s", strings.Join(s.PreviouslyUnseenNodePools.UnsortedList(), ", ")))
}
sortedNodePools := s.PreviouslyUnseenNodePools.UnsortedList()
sortedNodePools = append(sortedNodePools, lo.Filter(lo.Keys(nodePoolCandidates), func(nodePoolName string, _ int) bool {
Expand Down
Loading