Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion commands/alpha/live/plan/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *Runner) RunE(c *cobra.Command, args []string) error {
}

// Create and execute the planner.
planner, err := kptplanner.NewClusterPlanner(r.factory)
planner, err := kptplanner.NewClusterPlannerWithContext(r.ctx, r.factory)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions commands/live/apply/cmdapply.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func runApply(r *Runner, invInfo inventory.Info, objs []*unstructured.Unstructur
if err = cmdutil.InstallResourceGroupCRD(r.ctx, f); err != nil {
return err
}
} else if !live.ResourceGroupCRDMatched(f) {
} else if !live.ResourceGroupCRDMatchedWithContext(r.ctx, f) {
if err = cmdutil.InstallResourceGroupCRD(r.ctx, f); err != nil {
return &cmdutil.ResourceGroupCRDNotLatestError{
Err: err,
Expand All @@ -239,7 +239,7 @@ func runApply(r *Runner, invInfo inventory.Info, objs []*unstructured.Unstructur

// Run the applier. It will return a channel where we can receive updates
// to keep track of progress and any issues.
invClient, err := inventory.NewClient(r.factory, live.WrapInventoryObj, live.InvToUnstructuredFunc, r.statusPolicy, live.ResourceGroupGVK)
invClient, err := inventory.NewClient(r.factory, live.WrapInventoryObjWithContext(r.ctx), live.InvToUnstructuredFunc, r.statusPolicy, live.ResourceGroupGVK)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion commands/live/destroy/cmddestroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *Runner) runE(c *cobra.Command, args []string) error {
func runDestroy(r *Runner, inv inventory.Info, dryRunStrategy common.DryRunStrategy) error {
// Run the destroyer. It will return a channel where we can receive updates
// to keep track of progress and any issues.
invClient, err := inventory.NewClient(r.factory, live.WrapInventoryObj, live.InvToUnstructuredFunc, r.statusPolicy, live.ResourceGroupGVK)
invClient, err := inventory.NewClient(r.factory, live.WrapInventoryObjWithContext(r.ctx), live.InvToUnstructuredFunc, r.statusPolicy, live.ResourceGroupGVK)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion commands/live/livecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func GetCommand(ctx context.Context, _, version string) *cobra.Command {
}

f := util.NewFactory(liveCmd, version)
invFactory := live.NewClusterClientFactory()
invFactory := live.NewClusterClientFactoryWithContext(ctx)
loader := status.NewRGInventoryLoader(ctx, f)

// Init command which updates a Kptfile for the ResourceGroup inventory object.
Expand Down
14 changes: 7 additions & 7 deletions commands/live/migrate/migratecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type Runner struct {
name string
rgFile string
force bool
rgInvClientFunc func(util.Factory) (inventory.Client, error)
cmInvClientFunc func(util.Factory) (inventory.Client, error)
rgInvClientFunc func(context.Context, util.Factory) (inventory.Client, error)
cmInvClientFunc func(context.Context, util.Factory) (inventory.Client, error)
cmLoader manifestreader.ManifestLoader
cmNotMigrated bool // flag to determine if migration from ConfigMap has occurred
}
Expand Down Expand Up @@ -348,11 +348,11 @@ func validateParams(reader io.Reader, args []string) error {
return nil
}

func rgInvClient(factory util.Factory) (inventory.Client, error) {
return inventory.NewClient(factory, live.WrapInventoryObj, live.InvToUnstructuredFunc, inventory.StatusPolicyAll, live.ResourceGroupGVK)
func rgInvClient(ctx context.Context, factory util.Factory) (inventory.Client, error) {
return inventory.NewClient(factory, live.WrapInventoryObjWithContext(ctx), live.InvToUnstructuredFunc, inventory.StatusPolicyAll, live.ResourceGroupGVK)
}

func cmInvClient(factory util.Factory) (inventory.Client, error) {
func cmInvClient(_ context.Context, factory util.Factory) (inventory.Client, error) {
return inventory.NewClient(factory, inventory.WrapInventoryObj, inventory.InvInfoToConfigMap, inventory.StatusPolicyAll, live.ResourceGroupGVK)
}

Expand Down Expand Up @@ -412,11 +412,11 @@ func (mr *Runner) migrateKptfileToRG(args []string) error {
func (mr *Runner) migrateCMToRG(stdinBytes []byte, args []string) error {
// Create the inventory clients for reading inventories based on RG and
// ConfigMap.
rgInvClient, err := mr.rgInvClientFunc(mr.factory)
rgInvClient, err := mr.rgInvClientFunc(mr.ctx, mr.factory)
if err != nil {
return err
}
cmInvClient, err := mr.cmInvClientFunc(mr.factory)
cmInvClient, err := mr.cmInvClientFunc(mr.ctx, mr.factory)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions commands/live/migrate/migratecmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package migrate

import (
"context"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestKptMigrate_migrateKptfileToRG(t *testing.T) {
migrateRunner := NewRunner(ctx, tf, cmLoader, ioStreams)
migrateRunner.dryRun = tc.dryRun
migrateRunner.rgFile = tc.rgFilename
migrateRunner.cmInvClientFunc = func(_ util.Factory) (inventory.Client, error) {
migrateRunner.cmInvClientFunc = func(_ context.Context, _ util.Factory) (inventory.Client, error) {
return inventory.NewFakeClient([]object.ObjMetadata{}), nil
}
err = migrateRunner.migrateKptfileToRG([]string{dir})
Expand Down Expand Up @@ -247,7 +248,7 @@ func TestKptMigrate_retrieveConfigMapInv(t *testing.T) {
// Create MigrateRunner and call "retrieveConfigMapInv"
cmLoader := manifestreader.NewManifestLoader(tf)
migrateRunner := NewRunner(ctx, tf, cmLoader, ioStreams)
migrateRunner.cmInvClientFunc = func(_ util.Factory) (inventory.Client, error) {
migrateRunner.cmInvClientFunc = func(_ context.Context, _ util.Factory) (inventory.Client, error) {
return inventory.NewFakeClient([]object.ObjMetadata{}), nil
}
actual, err := migrateRunner.retrieveConfigMapInv(strings.NewReader(tc.configMap), []string{"-"})
Expand Down
41 changes: 38 additions & 3 deletions pkg/live/inventory-client-factory.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The kpt Authors
// Copyright 2022,2026 The kpt Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,18 +15,53 @@
package live

import (
"context"

cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/inventory"
)

// ClusterClientFactory is a factory that creates instances of ClusterClient inventory client.
// ClusterClientFactory is a factory that creates instances of ClusterClient
// inventory client.
//
// Ctx, if set, is plumbed into the InventoryResourceGroup wrapper so
// Apply / ApplyWithPrune honor caller cancellation (Ctrl-C, timeouts).
// The upstream inventory.ClientFactory interface's NewClient signature
// does not accept a context, so we carry one on the factory instead;
// construct via NewClusterClientFactoryWithContext when you have one.
type ClusterClientFactory struct {
StatusPolicy inventory.StatusPolicy
Ctx context.Context
}

// NewClusterClientFactory returns a ClusterClientFactory that will build
// inventory clients with no context propagation (cluster API calls use
// context.Background()). Prefer NewClusterClientFactoryWithContext.
func NewClusterClientFactory() *ClusterClientFactory {
return &ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone}
}

// NewClusterClientFactoryWithContext returns a ClusterClientFactory that
// threads ctx into every inventory client it produces.
//
// A nil ctx is normalized to context.Background() so the docstring's
// promise ("threads ctx into every inventory client") holds for every
// input: there is no hidden code path that silently drops propagation.
func NewClusterClientFactoryWithContext(ctx context.Context) *ClusterClientFactory {
Comment thread
Jaisheesh-2006 marked this conversation as resolved.
if ctx == nil {
ctx = context.Background()
}
return &ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone, Ctx: ctx}
}

func (ccf *ClusterClientFactory) NewClient(factory cmdutil.Factory) (inventory.Client, error) {
return inventory.NewClient(factory, WrapInventoryObj, InvToUnstructuredFunc, ccf.StatusPolicy, ResourceGroupGVK)
// Defense in depth: normalize a nil Ctx here too. This covers the
// case where a caller constructed ClusterClientFactory as a struct
// literal (e.g. &ClusterClientFactory{StatusPolicy: ...}) and left
// Ctx unset — see NewClusterClientFactory, which does exactly that.
ctx := ccf.Ctx
if ctx == nil {
ctx = context.Background()
}
return inventory.NewClient(factory, WrapInventoryObjWithContext(ctx), InvToUnstructuredFunc, ccf.StatusPolicy, ResourceGroupGVK)
}
82 changes: 72 additions & 10 deletions pkg/live/inventoryrg.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 The kpt Authors
// Copyright 2020,2026 The kpt Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,12 +59,30 @@ var ResourceGroupGVK = schema.GroupVersionKind{
// InventoryResourceGroup wraps a ResourceGroup resource and implements
// the Inventory and InventoryInfo interface. This wrapper loads and stores the
// object metadata (inventory) to and from the wrapped ResourceGroup.
//
// ctx, if non-nil, is the caller's context and is used for the live
// Kubernetes API calls performed by Apply / ApplyWithPrune. It exists on
// the struct (rather than as a parameter) because the upstream
// inventory.Storage interface signatures do not include a context; see
// WrapInventoryObjWithContext.
type InventoryResourceGroup struct {
ctx context.Context
inv *unstructured.Unstructured
objMetas []object.ObjMetadata
objStatus []actuation.ObjectStatus
}

// contextOrBackground returns the caller-supplied context if set, or
// context.Background() otherwise. Callers going through
// WrapInventoryObjWithContext get real cancellation/timeout propagation;
// legacy callers using WrapInventoryObj continue to work unchanged.
func (icm *InventoryResourceGroup) contextOrBackground() context.Context {
if icm.ctx != nil {
return icm.ctx
}
return context.Background()
}

func (icm *InventoryResourceGroup) Strategy() inventory.Strategy {
return inventory.NameStrategy
}
Expand All @@ -74,14 +92,40 @@ var _ inventory.Info = &InventoryResourceGroup{}

// WrapInventoryObj takes a passed ResourceGroup (as a resource.Info),
// wraps it with the InventoryResourceGroup and upcasts the wrapper as
// an the Inventory interface.
// the Inventory interface.
//
// The wrapped inventory will use context.Background() for cluster API
// calls. Prefer WrapInventoryObjWithContext when you have a caller
// context (e.g. cmd.Context()) so that Ctrl-C and caller-side timeouts
// actually cancel the in-flight request.
func WrapInventoryObj(obj *unstructured.Unstructured) inventory.Storage {
if obj != nil {
klog.V(4).Infof("wrapping Inventory obj: %s/%s\n", obj.GetNamespace(), obj.GetName())
}
return &InventoryResourceGroup{inv: obj}
}

// WrapInventoryObjWithContext returns a wrapper function compatible with
// inventory.NewClient's WrapObjFunc parameter. The returned function
// produces an InventoryResourceGroup that carries ctx, so subsequent
// Apply / ApplyWithPrune calls honor the caller's cancellation and
// timeout.
//
// If ctx is nil it is normalized to context.Background() so callers
// cannot accidentally trigger a nil-deref inside client-go. Prefer
// passing a real ctx (e.g. cmd.Context()) to actually gain cancellation.
func WrapInventoryObjWithContext(ctx context.Context) func(*unstructured.Unstructured) inventory.Storage {
if ctx == nil {
ctx = context.Background()
}
return func(obj *unstructured.Unstructured) inventory.Storage {
if obj != nil {
klog.V(4).Infof("wrapping Inventory obj with ctx: %s/%s\n", obj.GetNamespace(), obj.GetName())
}
return &InventoryResourceGroup{ctx: ctx, inv: obj}
}
}
Comment thread
Jaisheesh-2006 marked this conversation as resolved.

func WrapInventoryInfoObj(obj *unstructured.Unstructured) inventory.Info {
if obj != nil {
klog.V(4).Infof("wrapping InventoryInfo obj: %s/%s\n", obj.GetNamespace(), obj.GetName())
Expand Down Expand Up @@ -256,9 +300,10 @@ func (icm *InventoryResourceGroup) Apply(dc dynamic.Interface, mapper meta.RESTM
if err != nil {
return err
}
ctx := icm.contextOrBackground()

// Get cluster object, if exsists.
clusterObj, err := namespacedClient.Get(context.TODO(), invInfo.GetName(), metav1.GetOptions{})
// Get cluster object, if exists.
clusterObj, err := namespacedClient.Get(ctx, invInfo.GetName(), metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand All @@ -267,10 +312,10 @@ func (icm *InventoryResourceGroup) Apply(dc dynamic.Interface, mapper meta.RESTM

if clusterObj == nil {
// Create cluster inventory object, if it does not exist on cluster.
appliedObj, err = namespacedClient.Create(context.TODO(), invInfo, metav1.CreateOptions{})
appliedObj, err = namespacedClient.Create(ctx, invInfo, metav1.CreateOptions{})
} else {
// Update the cluster inventory object instead.
appliedObj, err = namespacedClient.Update(context.TODO(), invInfo, metav1.UpdateOptions{})
appliedObj, err = namespacedClient.Update(ctx, invInfo, metav1.UpdateOptions{})
}
if err != nil {
return err
Expand All @@ -279,7 +324,7 @@ func (icm *InventoryResourceGroup) Apply(dc dynamic.Interface, mapper meta.RESTM
// Update status.
if statusPolicy == inventory.StatusPolicyAll {
invInfo.SetResourceVersion(appliedObj.GetResourceVersion())
_, err = namespacedClient.UpdateStatus(context.TODO(), invInfo, metav1.UpdateOptions{})
_, err = namespacedClient.UpdateStatus(ctx, invInfo, metav1.UpdateOptions{})
}

return err
Expand All @@ -290,11 +335,12 @@ func (icm *InventoryResourceGroup) ApplyWithPrune(dc dynamic.Interface, mapper m
if err != nil {
return err
}
ctx := icm.contextOrBackground()

// Update the cluster inventory object.
// Since the ResourceGroup CRD specifies the status as a sub-resource, this
// will not update the status.
appliedObj, err := namespacedClient.Update(context.TODO(), invInfo, metav1.UpdateOptions{})
appliedObj, err := namespacedClient.Update(ctx, invInfo, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand All @@ -314,7 +360,7 @@ func (icm *InventoryResourceGroup) ApplyWithPrune(dc dynamic.Interface, mapper m
if err != nil {
return err
}
_, err = namespacedClient.UpdateStatus(context.TODO(), appliedObj, metav1.UpdateOptions{})
_, err = namespacedClient.UpdateStatus(ctx, appliedObj, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -386,7 +432,23 @@ func ResourceGroupCRDApplied(factory cmdutil.Factory) bool {

// ResourceGroupCRDMatched checks if the ResourceGroup CRD
// in the cluster matches the CRD in the kpt binary.
//
// This signature is preserved for backward compatibility with external
// callers; it delegates to ResourceGroupCRDMatchedWithContext with
// context.Background(). Prefer ResourceGroupCRDMatchedWithContext when
// you have a caller context so Ctrl-C and timeouts can abort the check.
func ResourceGroupCRDMatched(factory cmdutil.Factory) bool {
return ResourceGroupCRDMatchedWithContext(context.Background(), factory)
}

// ResourceGroupCRDMatchedWithContext is the context-aware variant of
// ResourceGroupCRDMatched. ctx is used for the live cluster Get call;
// cancelling it (e.g. via Ctrl-C or a command-level timeout) aborts the
// check. A nil ctx is normalized to context.Background().
func ResourceGroupCRDMatchedWithContext(ctx context.Context, factory cmdutil.Factory) bool {
if ctx == nil {
ctx = context.Background()
}
mapper, err := factory.ToRESTMapper()
if err != nil {
klog.V(4).Infof("error retrieving RESTMapper when checking ResourceGroup CRD: %s\n", err)
Expand All @@ -410,7 +472,7 @@ func ResourceGroupCRDMatched(factory cmdutil.Factory) bool {
return false
}

Comment thread
Jaisheesh-2006 marked this conversation as resolved.
liveCRD, err := dc.Resource(mapping.Resource).Get(context.TODO(), "resourcegroups.kpt.dev", metav1.GetOptions{
liveCRD, err := dc.Resource(mapping.Resource).Get(ctx, "resourcegroups.kpt.dev", metav1.GetOptions{
TypeMeta: metav1.TypeMeta{
APIVersion: crd.GetAPIVersion(),
Kind: "CustomResourceDefinition",
Expand Down
Loading
Loading