Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion commands/alpha/live/plan/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *Runner) RunE(c *cobra.Command, args []string) error {
}

// Create and execute the planner.
planner, err := kptplanner.NewClusterPlanner(r.factory)
planner, err := kptplanner.NewClusterPlanner(r.ctx, r.factory)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions commands/live/apply/cmdapply.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func runApply(r *Runner, invInfo inventory.Info, objs []*unstructured.Unstructur
if err = cmdutil.InstallResourceGroupCRD(r.ctx, f); err != nil {
return err
}
} else if !live.ResourceGroupCRDMatched(f) {
} else if !live.ResourceGroupCRDMatched(r.ctx, f) {
if err = cmdutil.InstallResourceGroupCRD(r.ctx, f); err != nil {
return &cmdutil.ResourceGroupCRDNotLatestError{
Err: err,
Expand All @@ -239,7 +239,7 @@ func runApply(r *Runner, invInfo inventory.Info, objs []*unstructured.Unstructur

// Run the applier. It will return a channel where we can receive updates
// to keep track of progress and any issues.
invClient, err := inventory.NewClient(r.factory, live.WrapInventoryObj, live.InvToUnstructuredFunc, r.statusPolicy, live.ResourceGroupGVK)
invClient, err := inventory.NewClient(r.factory, live.WrapInventoryObjWithContext(r.ctx), live.InvToUnstructuredFunc, r.statusPolicy, live.ResourceGroupGVK)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion commands/live/destroy/cmddestroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *Runner) runE(c *cobra.Command, args []string) error {
func runDestroy(r *Runner, inv inventory.Info, dryRunStrategy common.DryRunStrategy) error {
// Run the destroyer. It will return a channel where we can receive updates
// to keep track of progress and any issues.
invClient, err := inventory.NewClient(r.factory, live.WrapInventoryObj, live.InvToUnstructuredFunc, r.statusPolicy, live.ResourceGroupGVK)
invClient, err := inventory.NewClient(r.factory, live.WrapInventoryObjWithContext(r.ctx), live.InvToUnstructuredFunc, r.statusPolicy, live.ResourceGroupGVK)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion commands/live/livecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func GetCommand(ctx context.Context, _, version string) *cobra.Command {
}

f := util.NewFactory(liveCmd, version)
invFactory := live.NewClusterClientFactory()
invFactory := live.NewClusterClientFactoryWithContext(ctx)
loader := status.NewRGInventoryLoader(ctx, f)

// Init command which updates a Kptfile for the ResourceGroup inventory object.
Expand Down
14 changes: 7 additions & 7 deletions commands/live/migrate/migratecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type Runner struct {
name string
rgFile string
force bool
rgInvClientFunc func(util.Factory) (inventory.Client, error)
cmInvClientFunc func(util.Factory) (inventory.Client, error)
rgInvClientFunc func(context.Context, util.Factory) (inventory.Client, error)
cmInvClientFunc func(context.Context, util.Factory) (inventory.Client, error)
cmLoader manifestreader.ManifestLoader
cmNotMigrated bool // flag to determine if migration from ConfigMap has occurred
}
Expand Down Expand Up @@ -348,11 +348,11 @@ func validateParams(reader io.Reader, args []string) error {
return nil
}

func rgInvClient(factory util.Factory) (inventory.Client, error) {
return inventory.NewClient(factory, live.WrapInventoryObj, live.InvToUnstructuredFunc, inventory.StatusPolicyAll, live.ResourceGroupGVK)
func rgInvClient(ctx context.Context, factory util.Factory) (inventory.Client, error) {
return inventory.NewClient(factory, live.WrapInventoryObjWithContext(ctx), live.InvToUnstructuredFunc, inventory.StatusPolicyAll, live.ResourceGroupGVK)
}

func cmInvClient(factory util.Factory) (inventory.Client, error) {
func cmInvClient(_ context.Context, factory util.Factory) (inventory.Client, error) {
return inventory.NewClient(factory, inventory.WrapInventoryObj, inventory.InvInfoToConfigMap, inventory.StatusPolicyAll, live.ResourceGroupGVK)
}

Expand Down Expand Up @@ -412,11 +412,11 @@ func (mr *Runner) migrateKptfileToRG(args []string) error {
func (mr *Runner) migrateCMToRG(stdinBytes []byte, args []string) error {
// Create the inventory clients for reading inventories based on RG and
// ConfigMap.
rgInvClient, err := mr.rgInvClientFunc(mr.factory)
rgInvClient, err := mr.rgInvClientFunc(mr.ctx, mr.factory)
if err != nil {
return err
}
cmInvClient, err := mr.cmInvClientFunc(mr.factory)
cmInvClient, err := mr.cmInvClientFunc(mr.ctx, mr.factory)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions commands/live/migrate/migratecmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package migrate

import (
"context"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestKptMigrate_migrateKptfileToRG(t *testing.T) {
migrateRunner := NewRunner(ctx, tf, cmLoader, ioStreams)
migrateRunner.dryRun = tc.dryRun
migrateRunner.rgFile = tc.rgFilename
migrateRunner.cmInvClientFunc = func(_ util.Factory) (inventory.Client, error) {
migrateRunner.cmInvClientFunc = func(_ context.Context, _ util.Factory) (inventory.Client, error) {
return inventory.NewFakeClient([]object.ObjMetadata{}), nil
}
err = migrateRunner.migrateKptfileToRG([]string{dir})
Expand Down Expand Up @@ -247,7 +248,7 @@ func TestKptMigrate_retrieveConfigMapInv(t *testing.T) {
// Create MigrateRunner and call "retrieveConfigMapInv"
cmLoader := manifestreader.NewManifestLoader(tf)
migrateRunner := NewRunner(ctx, tf, cmLoader, ioStreams)
migrateRunner.cmInvClientFunc = func(_ util.Factory) (inventory.Client, error) {
migrateRunner.cmInvClientFunc = func(_ context.Context, _ util.Factory) (inventory.Client, error) {
return inventory.NewFakeClient([]object.ObjMetadata{}), nil
}
actual, err := migrateRunner.retrieveConfigMapInv(strings.NewReader(tc.configMap), []string{"-"})
Expand Down
30 changes: 27 additions & 3 deletions pkg/live/inventory-client-factory.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The kpt Authors
// Copyright 2022,2026 The kpt Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,18 +15,42 @@
package live

import (
"context"

cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/inventory"
)

// ClusterClientFactory is a factory that creates instances of ClusterClient inventory client.
// ClusterClientFactory is a factory that creates instances of ClusterClient
// inventory client.
//
// Ctx, if set, is plumbed into the InventoryResourceGroup wrapper so
// Apply / ApplyWithPrune honor caller cancellation (Ctrl-C, timeouts).
// The upstream inventory.ClientFactory interface's NewClient signature
// does not accept a context, so we carry one on the factory instead;
// construct via NewClusterClientFactoryWithContext when you have one.
type ClusterClientFactory struct {
StatusPolicy inventory.StatusPolicy
Ctx context.Context
}

// NewClusterClientFactory returns a ClusterClientFactory that will build
// inventory clients with no context propagation (cluster API calls use
// context.Background()). Prefer NewClusterClientFactoryWithContext.
func NewClusterClientFactory() *ClusterClientFactory {
return &ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone}
}

// NewClusterClientFactoryWithContext returns a ClusterClientFactory that
// threads ctx into every inventory client it produces.
func NewClusterClientFactoryWithContext(ctx context.Context) *ClusterClientFactory {
Comment thread
Jaisheesh-2006 marked this conversation as resolved.
return &ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone, Ctx: ctx}
}

func (ccf *ClusterClientFactory) NewClient(factory cmdutil.Factory) (inventory.Client, error) {
return inventory.NewClient(factory, WrapInventoryObj, InvToUnstructuredFunc, ccf.StatusPolicy, ResourceGroupGVK)
wrap := WrapInventoryObj
if ccf.Ctx != nil {
wrap = WrapInventoryObjWithContext(ccf.Ctx)
}
return inventory.NewClient(factory, wrap, InvToUnstructuredFunc, ccf.StatusPolicy, ResourceGroupGVK)
}
61 changes: 52 additions & 9 deletions pkg/live/inventoryrg.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 The kpt Authors
// Copyright 2020,2026 The kpt Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,12 +59,30 @@ var ResourceGroupGVK = schema.GroupVersionKind{
// InventoryResourceGroup wraps a ResourceGroup resource and implements
// the Inventory and InventoryInfo interface. This wrapper loads and stores the
// object metadata (inventory) to and from the wrapped ResourceGroup.
//
// ctx, if non-nil, is the caller's context and is used for the live
// Kubernetes API calls performed by Apply / ApplyWithPrune. It exists on
// the struct (rather than as a parameter) because the upstream
// inventory.Storage interface signatures do not include a context; see
// WrapInventoryObjWithContext.
type InventoryResourceGroup struct {
ctx context.Context
inv *unstructured.Unstructured
objMetas []object.ObjMetadata
objStatus []actuation.ObjectStatus
}

// contextOrBackground returns the caller-supplied context if set, or
// context.Background() otherwise. Callers going through
// WrapInventoryObjWithContext get real cancellation/timeout propagation;
// legacy callers using WrapInventoryObj continue to work unchanged.
func (icm *InventoryResourceGroup) contextOrBackground() context.Context {
if icm.ctx != nil {
return icm.ctx
}
return context.Background()
}

func (icm *InventoryResourceGroup) Strategy() inventory.Strategy {
return inventory.NameStrategy
}
Expand All @@ -75,13 +93,33 @@ var _ inventory.Info = &InventoryResourceGroup{}
// WrapInventoryObj takes a passed ResourceGroup (as a resource.Info),
// wraps it with the InventoryResourceGroup and upcasts the wrapper as
// an the Inventory interface.
Comment thread
Jaisheesh-2006 marked this conversation as resolved.
Outdated
//
// The wrapped inventory will use context.Background() for cluster API
// calls. Prefer WrapInventoryObjWithContext when you have a caller
// context (e.g. cmd.Context()) so that Ctrl-C and caller-side timeouts
// actually cancel the in-flight request.
func WrapInventoryObj(obj *unstructured.Unstructured) inventory.Storage {
if obj != nil {
klog.V(4).Infof("wrapping Inventory obj: %s/%s\n", obj.GetNamespace(), obj.GetName())
}
return &InventoryResourceGroup{inv: obj}
}

// WrapInventoryObjWithContext returns a wrapper function compatible with
// inventory.NewClient's WrapObjFunc parameter. The returned function
// produces an InventoryResourceGroup that carries ctx, so subsequent
// Apply / ApplyWithPrune calls honor the caller's cancellation and
// timeout. ctx MUST NOT be nil; pass context.Background() explicitly if
// you truly want no cancellation.
Comment thread
Jaisheesh-2006 marked this conversation as resolved.
Outdated
func WrapInventoryObjWithContext(ctx context.Context) func(*unstructured.Unstructured) inventory.Storage {
return func(obj *unstructured.Unstructured) inventory.Storage {
if obj != nil {
klog.V(4).Infof("wrapping Inventory obj with ctx: %s/%s\n", obj.GetNamespace(), obj.GetName())
}
return &InventoryResourceGroup{ctx: ctx, inv: obj}
}
}
Comment thread
Jaisheesh-2006 marked this conversation as resolved.

func WrapInventoryInfoObj(obj *unstructured.Unstructured) inventory.Info {
if obj != nil {
klog.V(4).Infof("wrapping InventoryInfo obj: %s/%s\n", obj.GetNamespace(), obj.GetName())
Expand Down Expand Up @@ -256,9 +294,10 @@ func (icm *InventoryResourceGroup) Apply(dc dynamic.Interface, mapper meta.RESTM
if err != nil {
return err
}
ctx := icm.contextOrBackground()

// Get cluster object, if exsists.
Comment thread
Jaisheesh-2006 marked this conversation as resolved.
Outdated
clusterObj, err := namespacedClient.Get(context.TODO(), invInfo.GetName(), metav1.GetOptions{})
clusterObj, err := namespacedClient.Get(ctx, invInfo.GetName(), metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand All @@ -267,10 +306,10 @@ func (icm *InventoryResourceGroup) Apply(dc dynamic.Interface, mapper meta.RESTM

if clusterObj == nil {
// Create cluster inventory object, if it does not exist on cluster.
appliedObj, err = namespacedClient.Create(context.TODO(), invInfo, metav1.CreateOptions{})
appliedObj, err = namespacedClient.Create(ctx, invInfo, metav1.CreateOptions{})
} else {
// Update the cluster inventory object instead.
appliedObj, err = namespacedClient.Update(context.TODO(), invInfo, metav1.UpdateOptions{})
appliedObj, err = namespacedClient.Update(ctx, invInfo, metav1.UpdateOptions{})
}
if err != nil {
return err
Expand All @@ -279,7 +318,7 @@ func (icm *InventoryResourceGroup) Apply(dc dynamic.Interface, mapper meta.RESTM
// Update status.
if statusPolicy == inventory.StatusPolicyAll {
invInfo.SetResourceVersion(appliedObj.GetResourceVersion())
_, err = namespacedClient.UpdateStatus(context.TODO(), invInfo, metav1.UpdateOptions{})
_, err = namespacedClient.UpdateStatus(ctx, invInfo, metav1.UpdateOptions{})
}

return err
Expand All @@ -290,11 +329,12 @@ func (icm *InventoryResourceGroup) ApplyWithPrune(dc dynamic.Interface, mapper m
if err != nil {
return err
}
ctx := icm.contextOrBackground()

// Update the cluster inventory object.
// Since the ResourceGroup CRD specifies the status as a sub-resource, this
// will not update the status.
appliedObj, err := namespacedClient.Update(context.TODO(), invInfo, metav1.UpdateOptions{})
appliedObj, err := namespacedClient.Update(ctx, invInfo, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand All @@ -314,7 +354,7 @@ func (icm *InventoryResourceGroup) ApplyWithPrune(dc dynamic.Interface, mapper m
if err != nil {
return err
}
_, err = namespacedClient.UpdateStatus(context.TODO(), appliedObj, metav1.UpdateOptions{})
_, err = namespacedClient.UpdateStatus(ctx, appliedObj, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -386,7 +426,10 @@ func ResourceGroupCRDApplied(factory cmdutil.Factory) bool {

// ResourceGroupCRDMatched checks if the ResourceGroup CRD
// in the cluster matches the CRD in the kpt binary.
func ResourceGroupCRDMatched(factory cmdutil.Factory) bool {
//
// ctx is used for the live cluster Get call; cancelling it (e.g. via
// Ctrl-C or a command-level timeout) aborts the check.
func ResourceGroupCRDMatched(ctx context.Context, factory cmdutil.Factory) bool {
Comment thread
Jaisheesh-2006 marked this conversation as resolved.
Outdated
mapper, err := factory.ToRESTMapper()
if err != nil {
klog.V(4).Infof("error retrieving RESTMapper when checking ResourceGroup CRD: %s\n", err)
Expand All @@ -410,7 +453,7 @@ func ResourceGroupCRDMatched(factory cmdutil.Factory) bool {
return false
}

Comment thread
Jaisheesh-2006 marked this conversation as resolved.
liveCRD, err := dc.Resource(mapping.Resource).Get(context.TODO(), "resourcegroups.kpt.dev", metav1.GetOptions{
liveCRD, err := dc.Resource(mapping.Resource).Get(ctx, "resourcegroups.kpt.dev", metav1.GetOptions{
TypeMeta: metav1.TypeMeta{
APIVersion: crd.GetAPIVersion(),
Kind: "CustomResourceDefinition",
Expand Down
74 changes: 73 additions & 1 deletion pkg/live/inventoryrg_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 The kpt Authors
// Copyright 2020,2026 The kpt Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
package live

import (
"context"
"testing"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -240,3 +241,74 @@ func TestIsResourceGroupInventory(t *testing.T) {
})
}
}

// TestWrapInventoryObjWithContext_StoresContext proves the new factory
// threads the caller's context into the InventoryResourceGroup struct.
// This is the mechanism the Apply / ApplyWithPrune methods use to honor
// Ctrl-C / caller timeouts instead of the old context.TODO() behavior.
func TestWrapInventoryObjWithContext_StoresContext(t *testing.T) {
type ctxKey struct{}
ctx := context.WithValue(context.Background(), ctxKey{}, "propagated")

storage := WrapInventoryObjWithContext(ctx)(inventoryObj)
icm, ok := storage.(*InventoryResourceGroup)
if !ok {
t.Fatalf("WrapInventoryObjWithContext produced unexpected type %T", storage)
}
if icm.ctx == nil {
t.Fatal("expected ctx on InventoryResourceGroup; got nil")
}
if got := icm.ctx.Value(ctxKey{}); got != "propagated" {
t.Fatalf("expected stored ctx to carry propagated value; got %v", got)
}
}

// TestWrapInventoryObj_LeavesContextNil confirms the legacy wrapper keeps
// ctx nil so contextOrBackground falls back to context.Background() —
// preserving the pre-refactor behavior for callers that haven't migrated.
func TestWrapInventoryObj_LeavesContextNil(t *testing.T) {
storage := WrapInventoryObj(inventoryObj)
icm, ok := storage.(*InventoryResourceGroup)
if !ok {
t.Fatalf("WrapInventoryObj produced unexpected type %T", storage)
}
if icm.ctx != nil {
t.Fatalf("expected legacy wrapper to leave ctx nil; got %v", icm.ctx)
}
}

// TestContextOrBackground covers both the override path (caller-supplied
// ctx is returned verbatim, including cancellation state) and the
// fallback path (nil ctx becomes context.Background()).
func TestContextOrBackground(t *testing.T) {
t.Run("returns stored ctx when set", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
icm := &InventoryResourceGroup{ctx: ctx}

got := icm.contextOrBackground()
if got != ctx {
t.Fatalf("expected contextOrBackground to return the stored ctx")
}
// Cancellation on the original ctx must be visible through the
// returned ctx — proof the value isn't copied or unwrapped.
cancel()
select {
case <-got.Done():
// expected
default:
t.Fatalf("returned ctx did not observe cancellation of the stored ctx")
}
})

t.Run("falls back to Background when nil", func(t *testing.T) {
icm := &InventoryResourceGroup{}
got := icm.contextOrBackground()
if got == nil {
t.Fatal("contextOrBackground returned nil; expected context.Background()")
}
// Background() never cancels; Done() returns a nil channel.
if got.Done() != nil {
t.Fatalf("expected Background-equivalent ctx; Done channel was not nil")
}
})
}
Loading
Loading