diff --git a/cluster-autoscaler/builder/autoscaler.go b/cluster-autoscaler/builder/autoscaler.go new file mode 100644 index 000000000000..2df8fd704d5f --- /dev/null +++ b/cluster-autoscaler/builder/autoscaler.go @@ -0,0 +1,288 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + "context" + "fmt" + capacityclient "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/client" + "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common" + capacitybuffer "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/controller" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config" + ca_context "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core" + coreoptions "k8s.io/autoscaler/cluster-autoscaler/core/options" + "k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/autoscaler/cluster-autoscaler/loop" + "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + cbprocessor "k8s.io/autoscaler/cluster-autoscaler/processors/capacitybuffer" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" + "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection" + podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" + "k8s.io/autoscaler/cluster-autoscaler/processors/pods" + "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" + "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" + "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates" + "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" + provreqorchestrator "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/orchestrator" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +// AutoscalerBuilder is the builder object for creating a Cluster Autoscaler instance. +type AutoscalerBuilder struct { + options config.AutoscalingOptions + + kubeClient kubernetes.Interface + listerRegistry kube_util.ListerRegistry + podObserver *loop.UnschedulablePodObserver + cloudProvider cloudprovider.CloudProvider + informerFactory informers.SharedInformerFactory +} + +// New creates a builder with default options. +func New(opts config.AutoscalingOptions) *AutoscalerBuilder { + return &AutoscalerBuilder{ + options: opts, + } +} + +// WithKubeClient allows injecting a FakeK8s client. +func (b *AutoscalerBuilder) WithKubeClient(client kubernetes.Interface) *AutoscalerBuilder { + b.kubeClient = client + return b +} + +// WithListerRegistry allows injecting a fake ListerRegistry. +func (b *AutoscalerBuilder) WithListerRegistry(registry kube_util.ListerRegistry) *AutoscalerBuilder { + b.listerRegistry = registry + return b +} + +// WithPodObserver allows injecting a pod observer. +func (b *AutoscalerBuilder) WithPodObserver(podObserver *loop.UnschedulablePodObserver) *AutoscalerBuilder { + b.podObserver = podObserver + return b +} + +// WithCloudProvider allows injecting a cloud provider. +func (b *AutoscalerBuilder) WithCloudProvider(cloudProvider cloudprovider.CloudProvider) *AutoscalerBuilder { + b.cloudProvider = cloudProvider + return b +} + +// WithInformerFactory allows injecting a shared informer factory. +func (b *AutoscalerBuilder) WithInformerFactory(f informers.SharedInformerFactory) *AutoscalerBuilder { + b.informerFactory = f + return b +} + +// Build constructs the Autoscaler based on the provided configuration. +func (b *AutoscalerBuilder) Build(ctx context.Context, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) { + // Get AutoscalingOptions from flags. + autoscalingOptions := b.options + + if b.kubeClient == nil { + return nil, nil, fmt.Errorf("kubeClient is missing: ensure WithKubeClient() is called") + } + if b.informerFactory == nil { + return nil, nil, fmt.Errorf("informerFactory is missing: ensure WithInformerFactory() is called") + } + + fwHandle, err := framework.NewHandle(ctx, b.informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.CSINodeAwareSchedulingEnabled) + if err != nil { + return nil, nil, err + } + deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) + drainabilityRules := rules.Default(deleteOptions) + + var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism) + opts := coreoptions.AutoscalerOptions{ + AutoscalingOptions: autoscalingOptions, + FrameworkHandle: fwHandle, + ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.PredicateParallelism, autoscalingOptions.CSINodeAwareSchedulingEnabled), + KubeClient: b.kubeClient, + InformerFactory: b.informerFactory, + DebuggingSnapshotter: debuggingSnapshotter, + DeleteOptions: deleteOptions, + DrainabilityRules: drainabilityRules, + ScaleUpOrchestrator: orchestrator.New(), + } + + opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions) + opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(&autoscalingOptions.NodeInfoCacheExpireTime, autoscalingOptions.ForceDaemonSets) + podListProcessor := podlistprocessor.NewDefaultPodListProcessor(scheduling.ScheduleAnywhere) + + var ProvisioningRequestInjector *provreq.ProvisioningRequestPodsInjector + if autoscalingOptions.ProvisioningRequestEnabled { + podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) + + restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) + client, err := provreqclient.NewProvisioningRequestClient(restConfig) + if err != nil { + return nil, nil, err + } + + ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing, opts.CheckCapacityProcessorInstance) + if err != nil { + return nil, nil, err + } + podListProcessor.AddProcessor(ProvisioningRequestInjector) + + var provisioningRequestPodsInjector *provreq.ProvisioningRequestPodsInjector + if autoscalingOptions.CheckCapacityBatchProcessing { + klog.Infof("Batch processing for check capacity requests is enabled. Passing provisioning request injector to check capacity processor.") + provisioningRequestPodsInjector = ProvisioningRequestInjector + } + + provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{ + checkcapacity.New(client, provisioningRequestPodsInjector), + besteffortatomic.New(client), + }) + + scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) + opts.ScaleUpOrchestrator = scaleUpOrchestrator + provreqProcesor := provreq.NewProvReqProcessor(client, opts.CheckCapacityProcessorInstance) + opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) + + podListProcessor.AddProcessor(provreqProcesor) + + opts.Processors.ScaleUpEnforcer = provreq.NewProvisioningRequestScaleUpEnforcer() + } + + var capacitybufferClient *capacityclient.CapacityBufferClient + var capacitybufferClientError error + if autoscalingOptions.CapacitybufferControllerEnabled { + restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) + capacitybufferClient, capacitybufferClientError = capacityclient.NewCapacityBufferClientFromConfig(restConfig) + if capacitybufferClientError == nil && capacitybufferClient != nil { + nodeBufferController := capacitybuffer.NewDefaultBufferController(capacitybufferClient) + go nodeBufferController.Run(ctx.Done()) + } + } + + if autoscalingOptions.CapacitybufferPodInjectionEnabled { + if capacitybufferClient == nil { + restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) + capacitybufferClient, capacitybufferClientError = capacityclient.NewCapacityBufferClientFromConfig(restConfig) + } + if capacitybufferClientError == nil && capacitybufferClient != nil { + buffersPodsRegistry := cbprocessor.NewDefaultCapacityBuffersFakePodsRegistry() + bufferPodInjector := cbprocessor.NewCapacityBufferPodListProcessor( + capacitybufferClient, + []string{common.ActiveProvisioningStrategy}, + buffersPodsRegistry, true) + podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{bufferPodInjector, podListProcessor}) + opts.Processors.ScaleUpStatusProcessor = status.NewCombinedScaleUpStatusProcessor([]status.ScaleUpStatusProcessor{ + cbprocessor.NewFakePodsScaleUpStatusProcessor(buffersPodsRegistry), opts.Processors.ScaleUpStatusProcessor}) + } + } + + if autoscalingOptions.ProactiveScaleupEnabled { + podInjectionBackoffRegistry := podinjectionbackoff.NewFakePodControllerRegistry() + + podInjectionPodListProcessor := podinjection.NewPodInjectionPodListProcessor(podInjectionBackoffRegistry) + enforceInjectedPodsLimitProcessor := podinjection.NewEnforceInjectedPodsLimitProcessor(autoscalingOptions.PodInjectionLimit) + + podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{podInjectionPodListProcessor, podListProcessor, enforceInjectedPodsLimitProcessor}) + + // FakePodsScaleUpStatusProcessor processor needs to be the first processor in ScaleUpStatusProcessor before the default processor + // As it filters out fake pods from Scale Up status so that we don't emit events. + opts.Processors.ScaleUpStatusProcessor = status.NewCombinedScaleUpStatusProcessor([]status.ScaleUpStatusProcessor{podinjection.NewFakePodsScaleUpStatusProcessor(podInjectionBackoffRegistry), opts.Processors.ScaleUpStatusProcessor}) + } + + opts.Processors.PodListProcessor = podListProcessor + sdCandidatesSorting := previouscandidates.NewPreviousCandidates() + scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{ + emptycandidates.NewEmptySortingProcessor(emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot), deleteOptions, drainabilityRules), + sdCandidatesSorting, + } + opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting) + + cp := scaledowncandidates.NewCombinedScaleDownCandidatesProcessor() + cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers)) + + if autoscalingOptions.ScaleDownDelayTypeLocal { + sdp := scaledowncandidates.NewScaleDownCandidatesDelayProcessor() + cp.Register(sdp) + opts.Processors.ScaleStateNotifier.Register(sdp) + + } + opts.Processors.ScaleDownNodeProcessor = cp + + // These metrics should be published only once. + metrics.UpdateCPULimitsCores(autoscalingOptions.MinCoresTotal, autoscalingOptions.MaxCoresTotal) + metrics.UpdateMemoryLimitsBytes(autoscalingOptions.MinMemoryTotal, autoscalingOptions.MaxMemoryTotal) + + // Initialize metrics. + metrics.InitMetrics() + + autoscalingKubeClients := ca_context.NewAutoscalingKubeClients(ctx, opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory) + // Use lister registry if provided. + if b.listerRegistry != nil { + autoscalingKubeClients.ListerRegistry = b.listerRegistry + } + opts.AutoscalingKubeClients = autoscalingKubeClients + + if b.cloudProvider != nil { + opts.CloudProvider = b.cloudProvider + } + + // Create autoscaler. + autoscaler, err := core.NewAutoscaler(ctx, opts, b.informerFactory) + if err != nil { + return nil, nil, err + } + + b.informerFactory.Start(ctx.Done()) + + klog.Info("Waiting for caches to sync...") + synced := b.informerFactory.WaitForCacheSync(ctx.Done()) + for _, ok := range synced { + if !ok { + return nil, nil, fmt.Errorf("failed to sync informer caches") + } + } + + if b.podObserver == nil { + b.podObserver = loop.StartPodObserver(ctx, b.kubeClient) + } + + // A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a + // ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods + // also marks the ProvisioningRequest as accepted or failed. + trigger := loop.NewLoopTrigger(autoscaler, ProvisioningRequestInjector, b.podObserver, autoscalingOptions.ScanInterval) + return autoscaler, trigger, nil +} diff --git a/cluster-autoscaler/builder/autoscaler_test.go b/cluster-autoscaler/builder/autoscaler_test.go new file mode 100644 index 000000000000..4dd921c48bc5 --- /dev/null +++ b/cluster-autoscaler/builder/autoscaler_test.go @@ -0,0 +1,66 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + "context" + "github.com/stretchr/testify/assert" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/expander" + "k8s.io/autoscaler/cluster-autoscaler/loop" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "testing" + "testing/synctest" + "time" +) + +func TestAutoscalerBuilderNoError(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + options := config.AutoscalingOptions{ + CloudProviderName: "gce", + EstimatorName: estimator.BinpackingEstimatorName, + ExpanderNames: expander.LeastWasteExpanderName, + } + + debuggingSnapshotter := debuggingsnapshot.NewDebuggingSnapshotter(false) + kubeClient := fake.NewClientset() + + autoscaler, trigger, err := New(options). + WithKubeClient(kubeClient). + WithInformerFactory(informers.NewSharedInformerFactory(kubeClient, 0)). + WithCloudProvider(test.NewCloudProvider(nil)). + WithPodObserver(&loop.UnschedulablePodObserver{}). + Build(ctx, debuggingSnapshotter) + + assert.NoError(t, err) + assert.NotNil(t, autoscaler) + assert.NotNil(t, trigger) + + cancel() + + // Synctest drain: Background goroutines (like MetricAsyncRecorder) often use uninterruptible time.Sleep loops. + // In a synctest bubble, these are "durable" sleeps. We must advance the virtual clock to allow these goroutines to wake up, observe the + // closed context channel, and terminate gracefully. + time.Sleep(1 * time.Second) + }) +} diff --git a/cluster-autoscaler/cloudprovider/builder/builder_all.go b/cluster-autoscaler/cloudprovider/builder/builder_all.go index e98cdc62f6a7..06724c9e4675 100644 --- a/cluster-autoscaler/cloudprovider/builder/builder_all.go +++ b/cluster-autoscaler/cloudprovider/builder/builder_all.go @@ -53,6 +53,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/volcengine" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/vultr" coreoptions "k8s.io/autoscaler/cluster-autoscaler/core/options" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/client-go/informers" ) @@ -164,3 +166,22 @@ func buildCloudProvider(opts *coreoptions.AutoscalerOptions, } return nil } + +func buildNodeInfoComparator(opts *coreoptions.AutoscalerOptions) nodegroupset.NodeInfoComparator { + if len(opts.BalancingLabels) > 0 { + return nodegroupset.CreateLabelNodeInfoComparator(opts.BalancingLabels) + } + providerName := opts.CloudProviderName + switch providerName { + case cloudprovider.AzureProviderName: + return nodegroupset.CreateAzureNodeInfoComparator(opts.BalancingExtraIgnoredLabels, opts.NodeGroupSetRatios) + case cloudprovider.AwsProviderName: + opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewAsgTagResourceNodeInfoProvider(&opts.NodeInfoCacheExpireTime, opts.ForceDaemonSets) + return nodegroupset.CreateAwsNodeInfoComparator(opts.BalancingExtraIgnoredLabels, opts.NodeGroupSetRatios) + case cloudprovider.GceProviderName: + opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewAnnotationNodeInfoProvider(&opts.NodeInfoCacheExpireTime, opts.ForceDaemonSets) + return nodegroupset.CreateGceNodeInfoComparator(opts.BalancingExtraIgnoredLabels, opts.NodeGroupSetRatios) + default: + return nodegroupset.CreateGenericNodeInfoComparator(opts.BalancingExtraIgnoredLabels, opts.NodeGroupSetRatios) + } +} diff --git a/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go b/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go index 51c435febded..ef5945489b70 100644 --- a/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go +++ b/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go @@ -20,6 +20,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" ca_context "k8s.io/autoscaler/cluster-autoscaler/context" coreoptions "k8s.io/autoscaler/cluster-autoscaler/core/options" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/client-go/informers" klog "k8s.io/klog/v2" @@ -44,6 +45,11 @@ func NewCloudProvider(opts *coreoptions.AutoscalerOptions, informerFactory infor } provider := buildCloudProvider(opts, do, rl, informerFactory) + + opts.Processors.NodeGroupSetProcessor = &nodegroupset.BalancingNodeGroupSetProcessor{ + Comparator: buildNodeInfoComparator(opts), + } + if provider != nil { return provider } diff --git a/cluster-autoscaler/cloudprovider/test/fake_cloud_provider.go b/cluster-autoscaler/cloudprovider/test/fake_cloud_provider.go new file mode 100644 index 000000000000..ce9c112a2cc8 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/test/fake_cloud_provider.go @@ -0,0 +1,393 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "fmt" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + fakek8s "k8s.io/autoscaler/cluster-autoscaler/utils/fake" + "sync" +) + +const ( + defaultMinSize = 0 + defaultMaxSize = 1000 +) + +// CloudProvider is a fake implementation of the cloudprovider interface for testing. +type CloudProvider struct { + sync.RWMutex + groups map[string]cloudprovider.NodeGroup + minLimits map[string]int64 + maxLimits map[string]int64 + // nodeToGroup tracks which node name belongs to which group ID. + nodeToGroup map[string]string + k8s *fakek8s.Kubernetes +} + +// CloudProviderOption defines a function to configure the CloudProvider. +type CloudProviderOption func(*CloudProvider) + +// NewCloudProvider creates a new instance of the fake CloudProvider. +func NewCloudProvider(k8s *fakek8s.Kubernetes) *CloudProvider { + return &CloudProvider{ + groups: make(map[string]cloudprovider.NodeGroup), + nodeToGroup: make(map[string]string), + minLimits: map[string]int64{ + cloudprovider.ResourceNameCores: 0, + cloudprovider.ResourceNameMemory: 0, + }, + maxLimits: map[string]int64{ + // Set to a effectively infinite number for tests. + cloudprovider.ResourceNameCores: 1000000, + cloudprovider.ResourceNameMemory: 1000000, + }, + k8s: k8s, + } +} + +// NodeGroups returns all node groups configured in the fake CloudProvider. +func (c *CloudProvider) NodeGroups() []cloudprovider.NodeGroup { + c.Lock() + defer c.Unlock() + var res []cloudprovider.NodeGroup + for _, g := range c.groups { + res = append(res, g) + } + return res +} + +// NodeGroupForNode returns the node group that a given node belongs to. +func (c *CloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) { + c.Lock() + defer c.Unlock() + groupId, ok := c.nodeToGroup[node.Name] + if !ok { + return nil, nil + } + return c.groups[groupId], nil +} + +// HasInstance returns true if the given node is managed by this cloud provider. +func (c *CloudProvider) HasInstance(node *apiv1.Node) (bool, error) { + c.Lock() + defer c.Unlock() + _, found := c.nodeToGroup[node.Name] + return found, nil +} + +// GetResourceLimiter generates a new limiter based on our current internal maps. +func (c *CloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) { + c.Lock() + defer c.Unlock() + return cloudprovider.NewResourceLimiter(c.minLimits, c.maxLimits), nil +} + +// GPULabel returns the label used to identify GPU types in this provider. +func (c *CloudProvider) GPULabel() string { return "gpu-label" } + +// GetAvailableGPUTypes returns a map of all GPU types available in this provider. +func (c *CloudProvider) GetAvailableGPUTypes() map[string]struct{} { return nil } + +// GetNodeGpuConfig returns the GPU configuration for a specific node. +func (c *CloudProvider) GetNodeGpuConfig(node *apiv1.Node) *cloudprovider.GpuConfig { return nil } + +// Cleanup performs any necessary teardown of the CloudProvider. +func (c *CloudProvider) Cleanup() error { return nil } + +// Refresh updates the internal state of the CloudProvider. +func (c *CloudProvider) Refresh() error { return nil } + +// Name returns the name of the cloud provider. +func (c *CloudProvider) Name() string { return "Provider" } + +// Pricing returns the pricing model associated with the provider. +func (c *CloudProvider) Pricing() (cloudprovider.PricingModel, errors.AutoscalerError) { + return nil, cloudprovider.ErrNotImplemented +} + +// GetAvailableMachineTypes returns the machine types supported by the provider. +func (c *CloudProvider) GetAvailableMachineTypes() ([]string, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// NewNodeGroup creates a new node group based on the provided specifications. +func (c *CloudProvider) NewNodeGroup(machineType string, labels map[string]string, systemLabels map[string]string, + taints []apiv1.Taint, extraResources map[string]resource.Quantity) (cloudprovider.NodeGroup, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// NodeGroupOption is a function that configures a NodeGroup during creation. +type NodeGroupOption func(*NodeGroup) + +// WithNode adds a single initial node to the group and +// automatically sets the group's template based on that node. +func WithNode(node *apiv1.Node) NodeGroupOption { + return func(n *NodeGroup) { + n.provider.nodeToGroup[node.Name] = n.id + n.instances[node.Name] = cloudprovider.InstanceRunning + n.targetSize = 1 + n.template = framework.NewTestNodeInfo(node.DeepCopy()) + if n.provider.k8s != nil { + n.provider.k8s.AddNode(node) + } + } +} + +// WithMinSize sets the minimum size of the node group. +func WithMinSize(min int) NodeGroupOption { + return func(n *NodeGroup) { + n.minSize = min + } +} + +// WithMaxSize sets the maximum size of the node group. +func WithMaxSize(max int) NodeGroupOption { + return func(n *NodeGroup) { + n.maxSize = max + } +} + +// WithTemplate sets the node template for the node group. +func WithTemplate(template *framework.NodeInfo) NodeGroupOption { + return func(n *NodeGroup) { + n.template = template + } +} + +// AddNodeGroup is a helper for tests to add a group with its template. +func (c *CloudProvider) AddNodeGroup(id string, opts ...NodeGroupOption) { + c.Lock() + defer c.Unlock() + + group := &NodeGroup{ + id: id, + minSize: defaultMinSize, + maxSize: defaultMaxSize, + targetSize: 0, + instances: make(map[string]cloudprovider.InstanceState), + provider: c, + } + + for _, opt := range opts { + opt(group) + } + c.groups[id] = group +} + +// GetNodeGroup is a helper for tests to get a node group. +func (c *CloudProvider) GetNodeGroup(id string) cloudprovider.NodeGroup { + c.Lock() + defer c.Unlock() + return c.groups[id] +} + +// AddNode connects a node name to a group ID. +func (c *CloudProvider) AddNode(groupId string, node *apiv1.Node) { + c.Lock() + defer c.Unlock() + c.nodeToGroup[node.Name] = groupId + if g, ok := c.groups[groupId].(*NodeGroup); ok { + g.Lock() + defer g.Unlock() + g.instances[node.Name] = cloudprovider.InstanceRunning + g.targetSize++ + } +} + +// SetResourceLimit allows the test to reach in and change the limits. +func (c *CloudProvider) SetResourceLimit(resource string, min, max int64) { + c.Lock() + defer c.Unlock() + c.minLimits[resource] = min + c.maxLimits[resource] = max +} + +// NodeGroup is a fake implementation of the cloudprovider.NodeGroup interface for testing. +type NodeGroup struct { + sync.RWMutex + id string + minSize int + maxSize int + targetSize int + template *framework.NodeInfo + // instances maps instanceID -> state. + instances map[string]cloudprovider.InstanceState + provider *CloudProvider +} + +// MaxSize returns the maximum size of the node group. +func (n *NodeGroup) MaxSize() int { + return n.maxSize +} + +// MinSize returns the minimum size of the node group. +func (n *NodeGroup) MinSize() int { + return n.minSize +} + +// AtomicIncreaseSize is a version of IncreaseSize that increases the size of the node group atomically. +func (n *NodeGroup) AtomicIncreaseSize(delta int) error { + return n.IncreaseSize(delta) +} + +// DeleteNodes removes specific nodes from the node group and updates the internal mapping. +func (n *NodeGroup) DeleteNodes(nodes []*apiv1.Node) error { + n.Lock() + defer n.Unlock() + + n.provider.Lock() + defer n.provider.Unlock() + + deletedCount := 0 + for _, node := range nodes { + if groupId, exists := n.provider.nodeToGroup[node.Name]; exists && groupId == n.id { + delete(n.provider.nodeToGroup, node.Name) + delete(n.instances, node.Name) + if n.provider.k8s != nil { + n.provider.k8s.DeleteNode(node.Name) + } + deletedCount++ + } else { + fmt.Printf("Warning: node %s not found in group %s or already deleted.", node.Name, n.id) + } + } + + if n.targetSize >= deletedCount { + n.targetSize -= deletedCount + } else { + n.targetSize = 0 + } + + return nil +} + +// ForceDeleteNodes deletes nodes without checking for specific conditions (fake implementation). +func (n *NodeGroup) ForceDeleteNodes(nodes []*apiv1.Node) error { + return n.DeleteNodes(nodes) +} + +// DecreaseTargetSize reduces the target size of the node group by the specified delta. +func (n *NodeGroup) DecreaseTargetSize(delta int) error { + n.Lock() + defer n.Unlock() + n.targetSize -= delta + return nil +} + +// Id returns the unique identifier of the node group. +func (n *NodeGroup) Id() string { + return n.id +} + +// Debug returns a string representation of the node group's current state. +func (n *NodeGroup) Debug() string { + return fmt.Sprintf("NodeGroup{id: %s, targetSize: %d}", n.id, n.targetSize) +} + +// Nodes returns a list of all instances currently existing in this node group. +func (n *NodeGroup) Nodes() ([]cloudprovider.Instance, error) { + n.provider.Lock() + defer n.provider.Unlock() + + var instances []cloudprovider.Instance + for id, state := range n.instances { + instances = append(instances, cloudprovider.Instance{ + Id: id, + Status: &cloudprovider.InstanceStatus{ + State: state, + }, + }) + } + return instances, nil +} + +// Exist returns true if the node group currently exists in the cloud provider. +func (n *NodeGroup) Exist() bool { + return true +} + +// Create creates the node group in the cloud provider (not implemented). +func (n *NodeGroup) Create() (cloudprovider.NodeGroup, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// Delete deletes the node group from the cloud provider (not implemented). +func (n *NodeGroup) Delete() error { + return cloudprovider.ErrNotImplemented +} + +// Autoprovisioned returns true if the node group is autoprovisioned. +func (n *NodeGroup) Autoprovisioned() bool { + return false +} + +// GetOptions returns autoscaling options specific to this node group. +func (n *NodeGroup) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) { + return nil, nil +} + +// TargetSize returns the current target size of the node group. +func (n *NodeGroup) TargetSize() (int, error) { return n.targetSize, nil } + +// IncreaseSize adds nodes to the node group and updates internal instance mapping. +func (n *NodeGroup) IncreaseSize(delta int) error { + n.Lock() + defer n.Unlock() + if n.targetSize+delta > n.maxSize { + return fmt.Errorf("size too large") + } + + n.provider.Lock() + defer n.provider.Unlock() + + for i := 0; i < delta; i++ { + instanceNum := n.targetSize + i + instanceId := fmt.Sprintf("%s-node-%d", n.id, instanceNum) + + if n.template == nil || n.template.Node() == nil { + return fmt.Errorf("node group %s has no template to create new nodes", n.id) + } + newNode := n.template.Node().DeepCopy() + newNode.Name = instanceId + newNode.Spec.ProviderID = instanceId + + n.instances[instanceId] = cloudprovider.InstanceRunning + n.provider.nodeToGroup[instanceId] = n.id + if n.provider.k8s != nil { + n.provider.k8s.AddNode(newNode) + } + } + n.targetSize += delta + return nil +} + +// TemplateNodeInfo returns the template node information for this node group. +func (n *NodeGroup) TemplateNodeInfo() (*framework.NodeInfo, error) { + if n.template == nil { + return nil, cloudprovider.ErrNotImplemented + } + return n.template, nil +} + +// GetTargetSize returns the target size as a raw integer (helper method). +func (n *NodeGroup) GetTargetSize() int { return n.targetSize } diff --git a/cluster-autoscaler/context/autoscaling_context.go b/cluster-autoscaler/context/autoscaling_context.go index 16633c5ff013..ec96c486b2c3 100644 --- a/cluster-autoscaler/context/autoscaling_context.go +++ b/cluster-autoscaler/context/autoscaling_context.go @@ -17,6 +17,7 @@ limitations under the License. package context import ( + "context" "time" appsv1 "k8s.io/api/apps/v1" @@ -161,9 +162,9 @@ func NewAutoscalingContext( } // NewAutoscalingKubeClients builds AutoscalingKubeClients out of basic client. -func NewAutoscalingKubeClients(opts config.AutoscalingOptions, kubeClient kube_client.Interface, informerFactory informers.SharedInformerFactory) *AutoscalingKubeClients { +func NewAutoscalingKubeClients(ctx context.Context, opts config.AutoscalingOptions, kubeClient kube_client.Interface, informerFactory informers.SharedInformerFactory) *AutoscalingKubeClients { listerRegistry := kube_util.NewListerRegistryWithDefaultListers(informerFactory) - kubeEventRecorder := kube_util.CreateEventRecorder(kubeClient, opts.RecordDuplicatedEvents) + kubeEventRecorder := kube_util.CreateEventRecorder(ctx, kubeClient, opts.RecordDuplicatedEvents) logRecorder, err := utils.NewStatusMapRecorder(kubeClient, opts.ConfigNamespace, kubeEventRecorder, opts.WriteStatusConfigMap, opts.StatusConfigMapName) if err != nil { klog.Error("Failed to initialize status configmap, unable to write status events") diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 6cc78b515a23..cbc604ea6815 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -17,6 +17,7 @@ limitations under the License. package core import ( + "context" "strings" "time" @@ -57,8 +58,8 @@ type Autoscaler interface { } // NewAutoscaler creates an autoscaler of an appropriate type according to the parameters -func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers.SharedInformerFactory) (Autoscaler, errors.AutoscalerError) { - err := initializeDefaultOptions(&opts, informerFactory) +func NewAutoscaler(ctx context.Context, opts coreoptions.AutoscalerOptions, informerFactory informers.SharedInformerFactory) (Autoscaler, errors.AutoscalerError) { + err := initializeDefaultOptions(ctx, &opts, informerFactory) if err != nil { return nil, errors.ToAutoscalerError(errors.InternalError, err) } @@ -85,7 +86,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers } // Initialize default options if not provided. -func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFactory informers.SharedInformerFactory) error { +func initializeDefaultOptions(ctx context.Context, opts *coreoptions.AutoscalerOptions, informerFactory informers.SharedInformerFactory) error { if opts.Processors == nil { opts.Processors = ca_processors.DefaultProcessors(opts.AutoscalingOptions) } @@ -93,10 +94,10 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto opts.LoopStartNotifier = loopstart.NewObserversList(nil) } if opts.AutoscalingKubeClients == nil { - opts.AutoscalingKubeClients = ca_context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory) + opts.AutoscalingKubeClients = ca_context.NewAutoscalingKubeClients(ctx, opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory) } if opts.FrameworkHandle == nil { - fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled, opts.CSINodeAwareSchedulingEnabled) + fwHandle, err := framework.NewHandle(ctx, opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled, opts.CSINodeAwareSchedulingEnabled) if err != nil { return err } diff --git a/cluster-autoscaler/core/bench/benchmark_runonce_test.go b/cluster-autoscaler/core/bench/benchmark_runonce_test.go new file mode 100644 index 000000000000..da30f339f78d --- /dev/null +++ b/cluster-autoscaler/core/bench/benchmark_runonce_test.go @@ -0,0 +1,419 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bench + +import ( + "context" + "fmt" + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/builder" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/core" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/loop" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + fakek8s "k8s.io/autoscaler/cluster-autoscaler/utils/fake" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" +) + +// Benchmark evaluates the performance of the Cluster Autoscaler's primary control loop (RunOnce). +// +// The benchmark simulates a cluster ecosystem by mocking two critical layers: +// - Kubeclient (fakeClient): Acts as the Kubernetes API "source of truth," managing Pod and Node +// objects, simulating node registration, and mimicking scheduler behavior. +// - CloudProvider (testprovider): Acts as the infrastructure abstraction, managing NodeGroups +// and providing the capacity templates used for scale-up simulations. +// +// This tool is designed for comparative analysis between patches to detect performance +// regressions in the core autoscaling logic. +// +// Current Simplifications: +// - Homogeneous Templates: Uses uniform Node and Pod templates. +// - Discrete Iterations: Advances a virtual clock to simulate the asynchronous stabilization +// of the cluster (e.g., node provisioning and pod placement). + +type benchmarkConfig struct { + nodeAvailableCPU int64 + nodeAvailableMemory int64 + nodeGroupMaxSize int + podsPerNode int + maxCoresTotal int64 + maxMemoryTotal int64 +} + +func defaultConfig() benchmarkConfig { + return benchmarkConfig{ + nodeAvailableCPU: 10000, + nodeAvailableMemory: 10000, + nodeGroupMaxSize: 10000, + podsPerNode: 100, + maxCoresTotal: 10000 * 10000, + maxMemoryTotal: 10000 * 10000 * 1024 * 1024 * 1024, + } +} + +type benchmarkScenario struct { + name string + config benchmarkConfig + init func(c *fake.Clientset, p *testprovider.CloudProvider, cfg benchmarkConfig) ([]*apiv1.Node, error) + podManipulator func(c *fake.Clientset, cfg benchmarkConfig) error + stopCondition func(readyNodes int) bool + customizeOptions func(*config.AutoscalingOptions) +} + +func runBenchmark(b *testing.B, sc benchmarkScenario) { + for i := 0; i < b.N; i++ { + b.StopTimer() + runner := newBenchmarkRunner(b, sc) + for { + b.StartTimer() + runner.stepAutoscaler() + b.StopTimer() + + if runner.stepSimulation() { + break + } + } + } +} + +type benchmarkRunner struct { + b *testing.B + sc benchmarkScenario + fakeClient *fake.Clientset + autoscaler core.Autoscaler + currentTime time.Time +} + +func (r *benchmarkRunner) stepAutoscaler() { + err := r.autoscaler.RunOnce(r.currentTime) + if err != nil { + r.b.Fatalf("RunOnce failed: %v", err) + } +} + +func (r *benchmarkRunner) stepSimulation() bool { + // Simulate nodes setup. + // Make any new nodes instantly ready and schedulable. + readyNodes := r.makeNodesReady() + + if r.sc.stopCondition(readyNodes) { + return true + } + + if err := r.sc.podManipulator(r.fakeClient, r.sc.config); err != nil { + r.b.Fatalf("failed to manipulate pods: %v", err) + } + + // Simulate scheduler. + r.trySchedulePods() + + // Allow caches to populate. + time.Sleep(100 * time.Millisecond) + + // Advance time to the next autoscaler RunOnce loop. + r.currentTime = r.currentTime.Add(5 * time.Second) + + return false +} + +func newBenchmarkRunner(b *testing.B, sc benchmarkScenario) *benchmarkRunner { + fakeClient := fake.NewClientset() + informerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + k8s := fakek8s.NewKubernetes(fakeClient, informerFactory) + provider := testprovider.NewCloudProvider(k8s) + provider.SetResourceLimit(cloudprovider.ResourceNameCores, 0, sc.config.maxCoresTotal) + provider.SetResourceLimit(cloudprovider.ResourceNameMemory, 0, sc.config.maxMemoryTotal) + + // Setup Nodes. + _, err := sc.init(fakeClient, provider, sc.config) + if err != nil { + b.Fatalf("Failed to init nodes: %v", err) + } + + // Setup Autoscaler. + options := config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + ScaleDownUnneededTime: time.Minute, + ScaleDownUnreadyTime: time.Minute, + ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, + }, + EstimatorName: estimator.BinpackingEstimatorName, + ExpanderNames: "random", + EnforceNodeGroupMinSize: true, + ScaleDownEnabled: true, + MaxNodesTotal: sc.config.nodeGroupMaxSize, + MaxCoresTotal: sc.config.maxCoresTotal, + MaxMemoryTotal: sc.config.maxMemoryTotal, + OkTotalUnreadyCount: 1, + MaxBinpackingTime: 1 * time.Second, + MaxNodeGroupBinpackingDuration: 1 * time.Second, + ScaleDownSimulationTimeout: 1 * time.Second, + MaxScaleDownParallelism: 10, + } + + if sc.customizeOptions != nil { + sc.customizeOptions(&options) + } + + debuggingSnapshotter := debuggingsnapshot.NewDebuggingSnapshotter(false) + autoscaler, _, err := builder.New(options). + WithKubeClient(fakeClient). + WithInformerFactory(informerFactory). + WithCloudProvider(provider). + WithListerRegistry(k8s.ListerRegistry()). + WithPodObserver(&loop.UnschedulablePodObserver{}). + Build(context.Background(), debuggingSnapshotter) + if err != nil { + b.Fatalf("Failed to build autoscaler: %v", err) + } + + return &benchmarkRunner{ + b: b, + sc: sc, + fakeClient: fakeClient, + autoscaler: autoscaler, + currentTime: time.Now(), + } +} + +func (r *benchmarkRunner) makeNodesReady() int { + nodeList, err := r.fakeClient.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + if err != nil { + r.b.Fatalf("failed to list nodes: %v", err) + } + for _, n := range nodeList.Items { + if !kube_util.IsNodeReadyAndSchedulable(&n) { + updated := n.DeepCopy() + SetNodeReadyState(updated, true, r.currentTime) + RemoveNodeNotReadyTaint(updated) + if _, err := r.fakeClient.CoreV1().Nodes().Update(context.Background(), updated, metav1.UpdateOptions{}); err != nil { + r.b.Fatalf("failed to update nodes: %v", err) + } + } + } + return len(nodeList.Items) +} + +// TODO: Speed up pods scheduling. +// CPU profile shows big contribution of trySchedulePods method. Reduce it. +// This might be completely "artificial" method. +func (r *benchmarkRunner) trySchedulePods() { + allPods, err := r.fakeClient.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{}) + if err != nil { + r.b.Fatalf("failed to list pods: %v", err) + } + + var unschedulablePods []*apiv1.Pod + podsPerNode := make(map[string]int) + for i := range allPods.Items { + if allPods.Items[i].Spec.NodeName == "" { + unschedulablePods = append(unschedulablePods, &allPods.Items[i]) + } else { + podsPerNode[allPods.Items[i].Spec.NodeName]++ + } + } + + allNodes, err := r.fakeClient.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + if err != nil { + r.b.Fatalf("failed to list nodes: %v", err) + } + var readyNodes []*apiv1.Node + for i := range allNodes.Items { + if kube_util.IsNodeReadyAndSchedulable(&allNodes.Items[i]) { + readyNodes = append(readyNodes, &allNodes.Items[i]) + } + } + + for _, node := range readyNodes { + for podsPerNode[node.Name] < r.sc.config.podsPerNode && len(unschedulablePods) > 0 { + var pod *apiv1.Pod + pod, unschedulablePods = unschedulablePods[0], unschedulablePods[1:] + podCopy := pod.DeepCopy() + podCopy.Spec.NodeName = node.Name + _, err := r.fakeClient.CoreV1().Pods("default").Update(context.Background(), podCopy, metav1.UpdateOptions{}) + if err != nil { + r.b.Fatalf("Failed to schedule pod: %v", err) + } + podsPerNode[node.Name]++ + } + } +} + +func addPods(perCall int) func(c *fake.Clientset, cfg benchmarkConfig) error { + added := 0 + return func(c *fake.Clientset, cfg benchmarkConfig) error { + for i := 0; i < perCall; i++ { + name := fmt.Sprintf("pod-%d", added) + cpu := cfg.nodeAvailableCPU / int64(cfg.podsPerNode) + mem := cfg.nodeAvailableMemory / int64(cfg.podsPerNode) + pod := BuildTestPod(name, cpu, mem, MarkUnschedulable()) + added++ + + _, err := c.CoreV1().Pods("default").Create(context.Background(), pod, metav1.CreateOptions{}) + if err != nil { + return err + } + } + return nil + } +} + +func initNodes(n int) func(c *fake.Clientset, p *testprovider.CloudProvider, cfg benchmarkConfig) ([]*apiv1.Node, error) { + return func(c *fake.Clientset, p *testprovider.CloudProvider, cfg benchmarkConfig) ([]*apiv1.Node, error) { + nodes := make([]*apiv1.Node, n) + for j := 0; j < n; j++ { + name := fmt.Sprintf("n-%d", j) + node := BuildTestNode(name, cfg.nodeAvailableCPU, cfg.nodeAvailableMemory) + SetNodeReadyState(node, true, time.Now()) + nodes[j] = node + _, err := c.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + } + + nTemplate := BuildTestNode("n-template", cfg.nodeAvailableCPU, cfg.nodeAvailableMemory) + SetNodeReadyState(nTemplate, true, time.Now()) + tni := framework.NewTestNodeInfo(nTemplate) + + p.AddNodeGroup("ng1", + testprovider.WithTemplate(tni), + testprovider.WithMaxSize(cfg.nodeGroupMaxSize), + ) + for _, node := range nodes { + p.AddNode("ng1", node) + } + + return nodes, nil + } +} + +func stopAtNodesGreaterOrEqual(n int) func(readyNodes int) bool { + return func(readyNodes int) bool { + return readyNodes >= n + } +} + +func stopAtNodesLessOrEqual(n int) func(readyNodes int) bool { + return func(readyNodes int) bool { + return readyNodes <= n + } +} + +func BenchmarkRunOnceScaleUp500Nodes(b *testing.B) { + sc := benchmarkScenario{ + name: "ScaleUp500Nodes", + config: defaultConfig(), + init: initNodes(1), + stopCondition: stopAtNodesGreaterOrEqual(500), + podManipulator: addPods(30), // Assuming 100 pods can fit on a node (see defaultConfig). + } + + runBenchmark(b, sc) +} + +func initNodesWithPods(n int) func(c *fake.Clientset, p *testprovider.CloudProvider, cfg benchmarkConfig) ([]*apiv1.Node, error) { + return func(c *fake.Clientset, p *testprovider.CloudProvider, cfg benchmarkConfig) ([]*apiv1.Node, error) { + nodes := make([]*apiv1.Node, n) + for j := range n { + name := fmt.Sprintf("n-%d", j) + node := BuildTestNode(name, cfg.nodeAvailableCPU, cfg.nodeAvailableMemory) + SetNodeReadyState(node, true, time.Now()) + nodes[j] = node + _, err := c.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + + // Add pods + for k := range cfg.podsPerNode { + podName := fmt.Sprintf("pod-%s-%d", name, k) + cpu := cfg.nodeAvailableCPU / int64(cfg.podsPerNode) + mem := cfg.nodeAvailableMemory / int64(cfg.podsPerNode) + pod := BuildTestPod(podName, cpu, mem) + pod.Spec.NodeName = name + _, err := c.CoreV1().Pods("default").Create(context.Background(), pod, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + } + } + + nTemplate := BuildTestNode("n-template", cfg.nodeAvailableCPU, cfg.nodeAvailableMemory) + SetNodeReadyState(nTemplate, true, time.Now()) + tni := framework.NewTestNodeInfo(nTemplate) + + p.AddNodeGroup("ng1", + testprovider.WithTemplate(tni), + testprovider.WithMaxSize(cfg.nodeGroupMaxSize), + ) + for _, node := range nodes { + p.AddNode("ng1", node) + } + + return nodes, nil + } +} + +func removePods(perCall int) func(c *fake.Clientset, _ benchmarkConfig) error { + return func(c *fake.Clientset, _ benchmarkConfig) error { + pods, err := c.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{}) + if err != nil { + return err + } + + toRemove := min(perCall, len(pods.Items)) + for i := range toRemove { + err := c.CoreV1().Pods("default").Delete(context.Background(), pods.Items[i].Name, metav1.DeleteOptions{}) + if err != nil { + return err + } + } + return nil + } +} + +func BenchmarkRunOnceScaleDown(b *testing.B) { + sc := benchmarkScenario{ + name: "ScaleDown10Nodes", + config: defaultConfig(), + init: initNodesWithPods(500), + stopCondition: stopAtNodesLessOrEqual(1), + podManipulator: removePods(30), + customizeOptions: func(opts *config.AutoscalingOptions) { + opts.NodeGroupDefaults.ScaleDownUnneededTime = 0 + opts.NodeGroupDefaults.ScaleDownUnreadyTime = 0 + // Avoid long waits for unneeded nodes + }, + } + + runBenchmark(b, sc) +} + +// TODO: Add DRA scenario. diff --git a/cluster-autoscaler/core/static_autoscaler_csi_test.go b/cluster-autoscaler/core/static_autoscaler_csi_test.go index c0872e2bc4d5..271d10049b6e 100644 --- a/cluster-autoscaler/core/static_autoscaler_csi_test.go +++ b/cluster-autoscaler/core/static_autoscaler_csi_test.go @@ -17,6 +17,7 @@ limitations under the License. package core import ( + "context" "fmt" "testing" "time" @@ -222,7 +223,7 @@ func TestStaticAutoscalerCSI(t *testing.T) { // Create a framework handle with informer-backed listers for StorageClass/PVC/CSIDriver. client := clientsetfake.NewSimpleClientset(k8sObjects...) informerFactory := informers.NewSharedInformerFactory(client, 0) - fwHandle, err := framework.NewHandle(informerFactory, nil, false, true) + fwHandle, err := framework.NewHandle(context.Background(), informerFactory, nil, false, true) require.NoError(t, err) stopCh := make(chan struct{}) t.Cleanup(func() { close(stopCh) }) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 4580bc0b7fad..f1b9235dd5fe 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -18,7 +18,7 @@ package main import ( "context" - "fmt" + "k8s.io/autoscaler/cluster-autoscaler/config" "net/http" "os" "os/signal" @@ -27,49 +27,17 @@ import ( "github.com/spf13/pflag" - capacityclient "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/client" - "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common" - "k8s.io/autoscaler/cluster-autoscaler/config/flags" - "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" - "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" - "k8s.io/autoscaler/cluster-autoscaler/loop" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" - "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate" - "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" - "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" - "k8s.io/kubernetes/pkg/features" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/server/mux" "k8s.io/apiserver/pkg/server/routes" utilfeature "k8s.io/apiserver/pkg/util/feature" - capacitybuffer "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/controller" - "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + autoscalerbuilder "k8s.io/autoscaler/cluster-autoscaler/builder" + "k8s.io/autoscaler/cluster-autoscaler/config/flags" "k8s.io/autoscaler/cluster-autoscaler/core" - coreoptions "k8s.io/autoscaler/cluster-autoscaler/core/options" - "k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/autoscaler/cluster-autoscaler/loop" "k8s.io/autoscaler/cluster-autoscaler/metrics" - "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" - ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" - cbprocessor "k8s.io/autoscaler/cluster-autoscaler/processors/capacitybuffer" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" - "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection" - podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" - "k8s.io/autoscaler/cluster-autoscaler/processors/pods" - "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" - "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" - "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates" - "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates" - "k8s.io/autoscaler/cluster-autoscaler/processors/status" - provreqorchestrator "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/orchestrator" - "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" - "k8s.io/autoscaler/cluster-autoscaler/simulator/options" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/version" "k8s.io/client-go/informers" @@ -83,6 +51,7 @@ import ( _ "k8s.io/component-base/logs/json/register" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" ) func registerSignalHandlers(autoscaler core.Autoscaler) { @@ -100,203 +69,6 @@ func registerSignalHandlers(autoscaler core.Autoscaler) { }() } -func buildAutoscaler(ctx context.Context, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) { - // Get AutoscalingOptions from flags. - autoscalingOptions := flags.AutoscalingOptions() - - kubeClient := kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts) - - // Informer transform to trim ManagedFields for memory efficiency. - trim := func(obj interface{}) (interface{}, error) { - if accessor, err := meta.Accessor(obj); err == nil { - accessor.SetManagedFields(nil) - } - return obj, nil - } - informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 0, informers.WithTransform(trim)) - - fwHandle, err := framework.NewHandle(informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.CSINodeAwareSchedulingEnabled) - if err != nil { - return nil, nil, err - } - deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) - drainabilityRules := rules.Default(deleteOptions) - - var snapshotStore clustersnapshot.ClusterSnapshotStore = store.NewDeltaSnapshotStore(autoscalingOptions.ClusterSnapshotParallelism) - opts := coreoptions.AutoscalerOptions{ - AutoscalingOptions: autoscalingOptions, - FrameworkHandle: fwHandle, - ClusterSnapshot: predicate.NewPredicateSnapshot(snapshotStore, fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled, autoscalingOptions.PredicateParallelism, autoscalingOptions.CSINodeAwareSchedulingEnabled), - KubeClient: kubeClient, - InformerFactory: informerFactory, - DebuggingSnapshotter: debuggingSnapshotter, - DeleteOptions: deleteOptions, - DrainabilityRules: drainabilityRules, - ScaleUpOrchestrator: orchestrator.New(), - } - - opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions) - opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(&autoscalingOptions.NodeInfoCacheExpireTime, autoscalingOptions.ForceDaemonSets) - podListProcessor := podlistprocessor.NewDefaultPodListProcessor(scheduling.ScheduleAnywhere) - - var ProvisioningRequestInjector *provreq.ProvisioningRequestPodsInjector - if autoscalingOptions.ProvisioningRequestEnabled { - podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) - - restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) - client, err := provreqclient.NewProvisioningRequestClient(restConfig) - if err != nil { - return nil, nil, err - } - - ProvisioningRequestInjector, err = provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize, opts.CheckCapacityBatchProcessing, opts.CheckCapacityProcessorInstance) - if err != nil { - return nil, nil, err - } - podListProcessor.AddProcessor(ProvisioningRequestInjector) - - var provisioningRequestPodsInjector *provreq.ProvisioningRequestPodsInjector - if autoscalingOptions.CheckCapacityBatchProcessing { - klog.Infof("Batch processing for check capacity requests is enabled. Passing provisioning request injector to check capacity processor.") - provisioningRequestPodsInjector = ProvisioningRequestInjector - } - - provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{ - checkcapacity.New(client, provisioningRequestPodsInjector), - besteffortatomic.New(client), - }) - - scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) - opts.ScaleUpOrchestrator = scaleUpOrchestrator - provreqProcesor := provreq.NewProvReqProcessor(client, opts.CheckCapacityProcessorInstance) - opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) - - podListProcessor.AddProcessor(provreqProcesor) - - opts.Processors.ScaleUpEnforcer = provreq.NewProvisioningRequestScaleUpEnforcer() - } - - var capacitybufferClient *capacityclient.CapacityBufferClient - var capacitybufferClientError error - if autoscalingOptions.CapacitybufferControllerEnabled { - restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) - capacitybufferClient, capacitybufferClientError = capacityclient.NewCapacityBufferClientFromConfig(restConfig) - if capacitybufferClientError == nil && capacitybufferClient != nil { - nodeBufferController := capacitybuffer.NewDefaultBufferController(capacitybufferClient) - go nodeBufferController.Run(make(chan struct{})) - } - } - - if autoscalingOptions.CapacitybufferPodInjectionEnabled { - if capacitybufferClient == nil { - restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) - capacitybufferClient, capacitybufferClientError = capacityclient.NewCapacityBufferClientFromConfig(restConfig) - } - if capacitybufferClientError == nil && capacitybufferClient != nil { - buffersPodsRegistry := cbprocessor.NewDefaultCapacityBuffersFakePodsRegistry() - bufferPodInjector := cbprocessor.NewCapacityBufferPodListProcessor( - capacitybufferClient, - []string{common.ActiveProvisioningStrategy}, - buffersPodsRegistry, true) - podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{bufferPodInjector, podListProcessor}) - opts.Processors.ScaleUpStatusProcessor = status.NewCombinedScaleUpStatusProcessor([]status.ScaleUpStatusProcessor{ - cbprocessor.NewFakePodsScaleUpStatusProcessor(buffersPodsRegistry), opts.Processors.ScaleUpStatusProcessor}) - } - } - - if autoscalingOptions.ProactiveScaleupEnabled { - podInjectionBackoffRegistry := podinjectionbackoff.NewFakePodControllerRegistry() - - podInjectionPodListProcessor := podinjection.NewPodInjectionPodListProcessor(podInjectionBackoffRegistry) - enforceInjectedPodsLimitProcessor := podinjection.NewEnforceInjectedPodsLimitProcessor(autoscalingOptions.PodInjectionLimit) - - podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{podInjectionPodListProcessor, podListProcessor, enforceInjectedPodsLimitProcessor}) - - // FakePodsScaleUpStatusProcessor processor needs to be the first processor in ScaleUpStatusProcessor before the default processor - // As it filters out fake pods from Scale Up status so that we don't emit events. - opts.Processors.ScaleUpStatusProcessor = status.NewCombinedScaleUpStatusProcessor([]status.ScaleUpStatusProcessor{podinjection.NewFakePodsScaleUpStatusProcessor(podInjectionBackoffRegistry), opts.Processors.ScaleUpStatusProcessor}) - } - - opts.Processors.PodListProcessor = podListProcessor - sdCandidatesSorting := previouscandidates.NewPreviousCandidates() - scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{ - emptycandidates.NewEmptySortingProcessor(emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot), deleteOptions, drainabilityRules), - sdCandidatesSorting, - } - opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting) - - cp := scaledowncandidates.NewCombinedScaleDownCandidatesProcessor() - cp.Register(scaledowncandidates.NewScaleDownCandidatesSortingProcessor(scaleDownCandidatesComparers)) - - if autoscalingOptions.ScaleDownDelayTypeLocal { - sdp := scaledowncandidates.NewScaleDownCandidatesDelayProcessor() - cp.Register(sdp) - opts.Processors.ScaleStateNotifier.Register(sdp) - - } - opts.Processors.ScaleDownNodeProcessor = cp - - var nodeInfoComparator nodegroupset.NodeInfoComparator - if len(autoscalingOptions.BalancingLabels) > 0 { - nodeInfoComparator = nodegroupset.CreateLabelNodeInfoComparator(autoscalingOptions.BalancingLabels) - } else { - // TODO elmiko - now that we are passing the AutoscalerOptions in to the - // NewCloudProvider function, we should migrate these cloud provider specific - // configurations to the NewCloudProvider method so that we remove more provider - // code from the core. - nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator - if autoscalingOptions.CloudProviderName == cloudprovider.AzureProviderName { - nodeInfoComparatorBuilder = nodegroupset.CreateAzureNodeInfoComparator - } else if autoscalingOptions.CloudProviderName == cloudprovider.AwsProviderName { - nodeInfoComparatorBuilder = nodegroupset.CreateAwsNodeInfoComparator - opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewAsgTagResourceNodeInfoProvider(&autoscalingOptions.NodeInfoCacheExpireTime, autoscalingOptions.ForceDaemonSets) - } else if autoscalingOptions.CloudProviderName == cloudprovider.GceProviderName { - nodeInfoComparatorBuilder = nodegroupset.CreateGceNodeInfoComparator - opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewAnnotationNodeInfoProvider(&autoscalingOptions.NodeInfoCacheExpireTime, autoscalingOptions.ForceDaemonSets) - } - nodeInfoComparator = nodeInfoComparatorBuilder(autoscalingOptions.BalancingExtraIgnoredLabels, autoscalingOptions.NodeGroupSetRatios) - } - - opts.Processors.NodeGroupSetProcessor = &nodegroupset.BalancingNodeGroupSetProcessor{ - Comparator: nodeInfoComparator, - } - - // These metrics should be published only once. - metrics.UpdateCPULimitsCores(autoscalingOptions.MinCoresTotal, autoscalingOptions.MaxCoresTotal) - metrics.UpdateMemoryLimitsBytes(autoscalingOptions.MinMemoryTotal, autoscalingOptions.MaxMemoryTotal) - - // Initialize metrics. - metrics.InitMetrics() - - // Create autoscaler. - autoscaler, err := core.NewAutoscaler(opts, informerFactory) - if err != nil { - return nil, nil, err - } - - // Start informers. This must come after fully constructing the autoscaler because - // additional informers might have been registered in the factory during NewAutoscaler. - stop := make(chan struct{}) - informerFactory.Start(stop) - - klog.Info("Initializing resource informers, blocking until caches are synced") - informersSynced := informerFactory.WaitForCacheSync(stop) - for _, synced := range informersSynced { - if !synced { - return nil, nil, fmt.Errorf("unable to start and sync resource informers") - } - } - - podObserver := loop.StartPodObserver(ctx, kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts)) - - // A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a - // ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods - // also marks the ProvisioningRequest as accepted or failed. - trigger := loop.NewLoopTrigger(autoscaler, ProvisioningRequestInjector, podObserver, autoscalingOptions.ScanInterval) - - return autoscaler, trigger, nil -} - func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) { autoscalingOpts := flags.AutoscalingOptions() @@ -304,10 +76,7 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho ctx, cancel := context.WithCancel(context.Background()) defer cancel() - autoscaler, trigger, err := buildAutoscaler(ctx, debuggingSnapshotter) - if err != nil { - klog.Fatalf("Failed to create autoscaler: %v", err) - } + autoscaler, trigger := mustBuildAutoscaler(ctx, autoscalingOpts, debuggingSnapshotter) // Register signal handlers for graceful shutdown. registerSignalHandlers(autoscaler) @@ -339,6 +108,29 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho } } +func mustBuildAutoscaler(ctx context.Context, opts config.AutoscalingOptions, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger) { + kubeClient := kube_util.CreateKubeClient(opts.KubeClientOpts) + + // Informer transform to trim ManagedFields for memory efficiency. + trim := func(obj interface{}) (interface{}, error) { + if accessor, err := meta.Accessor(obj); err == nil { + accessor.SetManagedFields(nil) + } + return obj, nil + } + informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 0, informers.WithTransform(trim)) + + autoscaler, trigger, err := autoscalerbuilder.New(opts). + WithKubeClient(kubeClient). + WithInformerFactory(informerFactory). + Build(ctx, debuggingSnapshotter) + + if err != nil { + klog.Fatalf("Failed to create autoscaler: %v", err) + } + return autoscaler, trigger +} + func main() { klog.InitFlags(nil) @@ -427,7 +219,7 @@ func main() { kubeClient.CoordinationV1(), resourcelock.ResourceLockConfig{ Identity: id, - EventRecorder: kube_util.CreateEventRecorder(kubeClient, autoscalingOpts.RecordDuplicatedEvents), + EventRecorder: kube_util.CreateEventRecorder(context.TODO(), kubeClient, autoscalingOpts.RecordDuplicatedEvents), }, ) if err != nil { diff --git a/cluster-autoscaler/processors/test/common.go b/cluster-autoscaler/processors/test/common.go index 3b45f53e33bc..2544281477ec 100644 --- a/cluster-autoscaler/processors/test/common.go +++ b/cluster-autoscaler/processors/test/common.go @@ -55,7 +55,7 @@ func NewTestProcessors(options config.AutoscalingOptions) (*processors.Autoscali NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), TemplateNodeInfoProvider: templateNodeInfoProvider, NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), - CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(true, false), + CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(options.DynamicResourceAllocationEnabled, options.CSINodeAwareSchedulingEnabled), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index 0fc0313376a5..d7052643c0bb 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -17,6 +17,7 @@ limitations under the License. package predicate import ( + "context" "fmt" "os" "path/filepath" @@ -361,7 +362,7 @@ func newTestPluginRunnerAndSnapshot(schedConfig *config.KubeSchedulerConfigurati schedConfig = defaultConfig } - fwHandle, err := framework.NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig, true, false) + fwHandle, err := framework.NewHandle(context.Background(), informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig, true, false) if err != nil { return nil, nil, err } diff --git a/cluster-autoscaler/simulator/framework/handle.go b/cluster-autoscaler/simulator/framework/handle.go index 07443a3c91c5..39bd80f1eda3 100644 --- a/cluster-autoscaler/simulator/framework/handle.go +++ b/cluster-autoscaler/simulator/framework/handle.go @@ -19,8 +19,6 @@ package framework import ( "context" "fmt" - "sync" - "k8s.io/client-go/informers" schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerconfiglatest "k8s.io/kubernetes/pkg/scheduler/apis/config/latest" @@ -29,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" schedulerframeworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics" + "sync" ) var ( @@ -42,7 +41,7 @@ type Handle struct { } // NewHandle builds a framework Handle based on the provided informers and scheduler config. -func NewHandle(informerFactory informers.SharedInformerFactory, schedConfig *schedulerconfig.KubeSchedulerConfiguration, draEnabled bool, csiEnabled bool) (*Handle, error) { +func NewHandle(ctx context.Context, informerFactory informers.SharedInformerFactory, schedConfig *schedulerconfig.KubeSchedulerConfiguration, draEnabled bool, csiEnabled bool) (*Handle, error) { if schedConfig == nil { var err error schedConfig, err = schedulerconfiglatest.Default() @@ -76,7 +75,7 @@ func NewHandle(informerFactory informers.SharedInformerFactory, schedConfig *sch schedulermetrics.InitMetrics() }) framework, err := schedulerframeworkruntime.NewFramework( - context.TODO(), + ctx, schedulerplugins.NewInTreeRegistry(), &schedConfig.Profiles[0], opts..., diff --git a/cluster-autoscaler/simulator/framework/test_utils.go b/cluster-autoscaler/simulator/framework/test_utils.go index 16466aa83c58..79c1b3915119 100644 --- a/cluster-autoscaler/simulator/framework/test_utils.go +++ b/cluster-autoscaler/simulator/framework/test_utils.go @@ -17,6 +17,7 @@ limitations under the License. package framework import ( + "context" apiv1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/client-go/informers" @@ -56,7 +57,7 @@ func NewTestFrameworkHandle() (*Handle, error) { if err != nil { return nil, err } - fwHandle, err := NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), defaultConfig, true, true) + fwHandle, err := NewHandle(context.Background(), informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), defaultConfig, true, true) if err != nil { return nil, err } diff --git a/cluster-autoscaler/test/integration/config.go b/cluster-autoscaler/test/integration/config.go new file mode 100644 index 000000000000..eb60c6eb360b --- /dev/null +++ b/cluster-autoscaler/test/integration/config.go @@ -0,0 +1,104 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/estimator" + "time" +) + +// DefaultAutoscalingOptions provides the baseline configuration for all tests. +var DefaultAutoscalingOptions = config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + ScaleDownUnneededTime: time.Second, + ScaleDownUnreadyTime: time.Minute, + ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, + }, + EstimatorName: estimator.BinpackingEstimatorName, + EnforceNodeGroupMinSize: true, + ScaleDownSimulationTimeout: 24 * time.Hour, + ScaleDownDelayAfterAdd: 0, + ScaleDownDelayAfterDelete: 0, + ScaleDownDelayAfterFailure: 0, + ScaleDownDelayTypeLocal: true, + ScaleDownEnabled: true, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + ExpanderNames: "least-waste", + ScaleUpFromZero: true, + FrequentLoopsEnabled: true, + ClusterName: "cluster-test", + MaxBinpackingTime: 10 * time.Second, +} + +// Config is the "blueprint" for a test. It defines the entire +// initial state of the world before the test runs. +type Config struct { + // BaseOptions can be set to DefaultAutoscalingOptions or a custom base. + BaseOptions *config.AutoscalingOptions + // OptionsOverrides allows adding options overrides. + OptionsOverrides []AutoscalingOptionOverride +} + +// NewConfig creates a Config pre-populated with DefaultAutoscalingOptions. +func NewConfig() *Config { + return &Config{ + BaseOptions: &DefaultAutoscalingOptions, + OptionsOverrides: []AutoscalingOptionOverride{}, + } +} + +// AutoscalingOptionOverride is a function that modifies an AutoscalingOptions object. +type AutoscalingOptionOverride func(*config.AutoscalingOptions) + +// WithOverrides allows adding options overrides to the config. +func (c *Config) WithOverrides(overrides ...AutoscalingOptionOverride) *Config { + c.OptionsOverrides = append(c.OptionsOverrides, overrides...) + return c +} + +// ResolveOptions merges the base options with all registered overrides. +func (c *Config) ResolveOptions() config.AutoscalingOptions { + var opts config.AutoscalingOptions + if c.BaseOptions != nil { + opts = *c.BaseOptions + } else { + opts = DefaultAutoscalingOptions + } + + for _, override := range c.OptionsOverrides { + override(&opts) + } + return opts +} + +// WithCloudProviderName sets the cloud provider name. +func WithCloudProviderName(name string) AutoscalingOptionOverride { + return func(o *config.AutoscalingOptions) { + o.CloudProviderName = name + } +} + +// WithScaleDownUnneededTime sets the scale down unneeded time option. +func WithScaleDownUnneededTime(d time.Duration) AutoscalingOptionOverride { + return func(o *config.AutoscalingOptions) { + o.NodeGroupDefaults.ScaleDownUnneededTime = d + } +} diff --git a/cluster-autoscaler/test/integration/framework.go b/cluster-autoscaler/test/integration/framework.go new file mode 100644 index 000000000000..458c3a22383a --- /dev/null +++ b/cluster-autoscaler/test/integration/framework.go @@ -0,0 +1,135 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "github.com/stretchr/testify/assert" + autoscalerbuilder "k8s.io/autoscaler/cluster-autoscaler/builder" + fakecloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/core" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" + "k8s.io/autoscaler/cluster-autoscaler/loop" + fakek8s "k8s.io/autoscaler/cluster-autoscaler/utils/fake" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "testing" + "testing/synctest" + "time" +) + +// TestContext acts as the bridge between the test logic and the simulation. +type TestContext struct { + t *testing.T + Ctx context.Context + Fakes *Fakes + Options *config.AutoscalingOptions + Autoscaler core.Autoscaler +} + +// Fakes is the struct used at test phase to make assertions. +type Fakes struct { + K8s *fakek8s.Kubernetes + Cloud *fakecloudprovider.CloudProvider +} + +// BuildAutoscaler initializes the core Autoscaler using the current Options and Fakes. +func (c *TestContext) BuildAutoscaler() { + c.t.Helper() + + var err error + + debuggingSnapshotter := debuggingsnapshot.NewDebuggingSnapshotter(false) + + c.Autoscaler, _, err = autoscalerbuilder.New(*c.Options). + WithKubeClient(c.Fakes.K8s.Client). + WithInformerFactory(c.Fakes.K8s.InformerFactory). + WithCloudProvider(c.Fakes.Cloud). + WithListerRegistry(c.Fakes.K8s.ListerRegistry()). + WithPodObserver(&loop.UnschedulablePodObserver{}). + Build(c.Ctx, debuggingSnapshotter) + + assert.NoError(c.t, err) + +} + +// RunOnceAfter advances the virtual clock by the specified duration and then +// executes a single Cluster Autoscaler cycle. +func (c *TestContext) RunOnceAfter(d time.Duration) error { + c.t.Helper() + + // Ensure any pending work is done before changing the time. + synctest.Wait() + + time.Sleep(d) + err := c.Autoscaler.RunOnce(time.Now()) + + // Let side-effects of the RunOnce finish. + synctest.Wait() + return err +} + +// MustRunOnceAfter is a helper that calls RunOnceAfter and +// immediately fails the test if an error occurs. +// Use this for "happy path" simulation steps. +func (c *TestContext) MustRunOnceAfter(d time.Duration) { + c.t.Helper() + err := c.RunOnceAfter(d) + assert.NoError(c.t, err) +} + +// RunTest encapsulates the setup, execution, and teardown. +func RunTest(t *testing.T, config *Config, scenario func(*TestContext)) { + t.Helper() + + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer tearDown(cancel) + + options := config.ResolveOptions() + + kubeClient := fake.NewClientset() + + k8s := fakek8s.NewKubernetes(kubeClient, informers.NewSharedInformerFactory(kubeClient, 0)) + fakes := &Fakes{ + K8s: k8s, + Cloud: fakecloudprovider.NewCloudProvider(k8s), + } + + tc := &TestContext{ + t: t, + Ctx: ctx, + Fakes: fakes, + Options: &options, + } + + scenario(tc) + if ctx.Err() == context.DeadlineExceeded { + t.Errorf("Test timed out. This usually means a background goroutine is leaked or synctest cannot drain the bubble.") + } + }) +} + +func tearDown(cancel context.CancelFunc) { + cancel() + // Synctest drain: Background goroutines (like MetricAsyncRecorder) often use uninterruptible time.Sleep loops. + // In a synctest bubble, these are "durable" sleeps. We must advance the virtual clock to allow these goroutines to wake up, observe the + // closed context channel, and terminate gracefully. + time.Sleep(1 * time.Minute) + synctest.Wait() +} diff --git a/cluster-autoscaler/test/integration/inmemory/staticautoscaler_test.go b/cluster-autoscaler/test/integration/inmemory/staticautoscaler_test.go new file mode 100644 index 000000000000..17451a9e255d --- /dev/null +++ b/cluster-autoscaler/test/integration/inmemory/staticautoscaler_test.go @@ -0,0 +1,99 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package inmemory + +import ( + "k8s.io/autoscaler/cluster-autoscaler/test/integration" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + fakecloudprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +const ( + unneededTime = 1 * time.Minute +) + +func TestStaticAutoscaler_FullLifecycle(t *testing.T) { + testConfig := integration.NewConfig(). + WithOverrides( + integration.WithCloudProviderName("gce"), + integration.WithScaleDownUnneededTime(unneededTime), + ) + + integration.RunTest(t, testConfig, func(ctx *integration.TestContext) { + ctx.BuildAutoscaler() + fakeK8s := ctx.Fakes.K8s + fakeCloud := ctx.Fakes.Cloud + + n1 := test.BuildTestNode("ng1-node-0", 1000, 1000, test.IsReady(true)) + fakeCloud.AddNodeGroup("ng1", fakecloudprovider.WithNode(n1)) + + fakeK8s.AddPod(test.BuildScheduledTestPod("p1", 600, 100, n1.Name)) + p2 := test.BuildTestPod("p2", 600, 100, test.MarkUnschedulable()) + fakeK8s.AddPod(p2) + + ctx.MustRunOnceAfter(1 * time.Minute) + + tg1, _ := fakeCloud.GetNodeGroup("ng1").TargetSize() + assert.Equal(t, 2, tg1) + + assert.Equal(t, 2, len(fakeK8s.Nodes().Items)) + + fakeK8s.DeletePod(p2.Namespace, p2.Name) + + // Detection and deletion steps. + ctx.MustRunOnceAfter(unneededTime) + ctx.MustRunOnceAfter(unneededTime) + + finalSize, _ := fakeCloud.GetNodeGroup("ng1").TargetSize() + assert.Equal(t, 1, finalSize) + }) +} + +func TestScaleUp_ResourceLimits(t *testing.T) { + testConfig := integration.NewConfig() + + integration.RunTest(t, testConfig, func(ctx *integration.TestContext) { + ctx.BuildAutoscaler() + fakeK8s := ctx.Fakes.K8s + fakeCloud := ctx.Fakes.Cloud + + n1 := test.BuildTestNode("ng1-node-0", 1000, 1000, test.IsReady(true)) + fakeCloud.AddNodeGroup("ng1", fakecloudprovider.WithNode(n1)) + + p1 := test.BuildTestPod("p1", 600, 100, test.MarkUnschedulable()) + fakeK8s.AddPod(p1) + + // Scale-up should be blocked. + fakeCloud.SetResourceLimit(cloudprovider.ResourceNameCores, 0, 1) + + ctx.MustRunOnceAfter(1 * time.Minute) + size, _ := fakeCloud.GetNodeGroup("ng1").TargetSize() + assert.Equal(t, 1, size, "Should not scale up when max cores limit is reached") + + // Scale-up should succeed. + fakeCloud.SetResourceLimit(cloudprovider.ResourceNameCores, 0, 2) + + ctx.MustRunOnceAfter(1 * time.Minute) + newSize, _ := fakeCloud.GetNodeGroup("ng1").TargetSize() + assert.Equal(t, 2, newSize, "Should scale up after resource limit is increased") + }) +} diff --git a/cluster-autoscaler/utils/fake/kubernetes.go b/cluster-autoscaler/utils/fake/kubernetes.go new file mode 100644 index 000000000000..e221dff3846b --- /dev/null +++ b/cluster-autoscaler/utils/fake/kubernetes.go @@ -0,0 +1,77 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "context" + apiv1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" +) + +// Kubernetes encapsulates a fake Kubernetes client and its corresponding listers. +type Kubernetes struct { + Client *fake.Clientset + InformerFactory informers.SharedInformerFactory +} + +// NewKubernetes creates a new, fully wired fake Kubernetes simulation. +func NewKubernetes(client *fake.Clientset, factory informers.SharedInformerFactory) *Kubernetes { + return &Kubernetes{ + Client: client, + InformerFactory: factory, + } +} + +// AddNode adds a node to the fake client. +func (k *Kubernetes) AddNode(node *apiv1.Node) { + _, _ = k.Client.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) +} + +// UpdateNode updates a node. +func (k *Kubernetes) UpdateNode(node *apiv1.Node) { + _, _ = k.Client.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) +} + +// DeleteNode deletes a node. +func (k *Kubernetes) DeleteNode(name string) { + _ = k.Client.CoreV1().Nodes().Delete(context.TODO(), name, metav1.DeleteOptions{}) +} + +// Nodes lists all available nodes. +func (k *Kubernetes) Nodes() *corev1.NodeList { + nodes, _ := k.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + return nodes +} + +// AddPod adds a pod to the fake client. +func (k *Kubernetes) AddPod(pod *apiv1.Pod) { + _, _ = k.Client.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) +} + +// DeletePod deletes a pod. +func (k *Kubernetes) DeletePod(namespace, name string) { + _ = k.Client.CoreV1().Pods(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) +} + +// ListerRegistry returns a real ListerRegistry populated with the fake listers. +func (k *Kubernetes) ListerRegistry() kubernetes.ListerRegistry { + return kubernetes.NewListerRegistryWithDefaultListers(k.InformerFactory) +} diff --git a/cluster-autoscaler/utils/kubernetes/factory.go b/cluster-autoscaler/utils/kubernetes/factory.go index 8b74b37f4f6b..939683c3754e 100644 --- a/cluster-autoscaler/utils/kubernetes/factory.go +++ b/cluster-autoscaler/utils/kubernetes/factory.go @@ -17,6 +17,7 @@ limitations under the License. package kubernetes import ( + "context" "strings" clientv1 "k8s.io/api/core/v1" @@ -39,13 +40,17 @@ const ( ) // CreateEventRecorder creates an event recorder to send custom events to Kubernetes to be recorded for targeted Kubernetes objects -func CreateEventRecorder(kubeClient clientset.Interface, recordDuplicatedEvents bool) kube_record.EventRecorder { +func CreateEventRecorder(ctx context.Context, kubeClient clientset.Interface, recordDuplicatedEvents bool) kube_record.EventRecorder { var eventBroadcaster kube_record.EventBroadcaster if recordDuplicatedEvents { eventBroadcaster = kube_record.NewBroadcaster() } else { eventBroadcaster = kube_record.NewBroadcasterWithCorrelatorOptions(getCorrelationOptions()) } + go func() { + <-ctx.Done() + eventBroadcaster.Shutdown() + }() if _, isfake := kubeClient.(*fake.Clientset); !isfake { actualSink := &v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")} // EventBroadcaster has a StartLogging() method but the throttling options from getCorrelationOptions() get applied only to diff --git a/cluster-autoscaler/utils/taints/taints_test.go b/cluster-autoscaler/utils/taints/taints_test.go index c8898cd72ba2..34b50f0d0219 100644 --- a/cluster-autoscaler/utils/taints/taints_test.go +++ b/cluster-autoscaler/utils/taints/taints_test.go @@ -288,7 +288,7 @@ func TestCleanAllToBeDeleted(t *testing.T) { n2.Spec.Taints = []apiv1.Taint{{Key: ToBeDeletedTaint, Value: strconv.FormatInt(time.Now().Unix()-301, 10)}} fakeClient := buildFakeClient(t, n1, n2) - fakeRecorder := kube_util.CreateEventRecorder(fakeClient, false) + fakeRecorder := kube_util.CreateEventRecorder(context.TODO(), fakeClient, false) assert.Equal(t, 1, len(getNode(t, fakeClient, "n2").Spec.Taints)) @@ -304,7 +304,7 @@ func TestCleanAllDeletionCandidates(t *testing.T) { n2.Spec.Taints = []apiv1.Taint{{Key: DeletionCandidateTaintKey, Value: strconv.FormatInt(time.Now().Unix()-301, 10)}} fakeClient := buildFakeClient(t, n1, n2) - fakeRecorder := kube_util.CreateEventRecorder(fakeClient, false) + fakeRecorder := kube_util.CreateEventRecorder(context.TODO(), fakeClient, false) assert.Equal(t, 1, len(getNode(t, fakeClient, "n2").Spec.Taints)) @@ -925,7 +925,7 @@ func TestCleanStaleDeletionCandidates(t *testing.T) { CleanStaleDeletionCandidates( tc.allNodes, fakeClient, - kube_util.CreateEventRecorder(fakeClient, false), + kube_util.CreateEventRecorder(context.TODO(), fakeClient, false), tc.nodeDeletionCandidateTTL, ) diff --git a/cluster-autoscaler/utils/test/test_utils.go b/cluster-autoscaler/utils/test/test_utils.go index e11f86a3bbbc..95333d0e73b0 100644 --- a/cluster-autoscaler/utils/test/test_utils.go +++ b/cluster-autoscaler/utils/test/test_utils.go @@ -327,8 +327,18 @@ func TolerateGpuForPod(pod *apiv1.Pod) { pod.Spec.Tolerations = append(pod.Spec.Tolerations, apiv1.Toleration{Key: resourceNvidiaGPU, Operator: apiv1.TolerationOpExists}) } +// NodeOption is a function that modifies a Node during construction. +type NodeOption func(*apiv1.Node) + +// IsReady sets the node to a Ready state. +func IsReady(ready bool) NodeOption { + return func(node *apiv1.Node) { + SetNodeReadyState(node, ready, time.Now()) + } +} + // BuildTestNode creates a node with specified capacity. -func BuildTestNode(name string, millicpuCapacity int64, memCapacity int64) *apiv1.Node { +func BuildTestNode(name string, millicpuCapacity int64, memCapacity int64, opts ...NodeOption) *apiv1.Node { node := &apiv1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -357,6 +367,10 @@ func BuildTestNode(name string, millicpuCapacity int64, memCapacity int64) *apiv node.Status.Allocatable[k] = v } + for _, opt := range opts { + opt(node) + } + return node }