Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions pkg/hook/controller/hook_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,25 @@ func (hc *HookController) UnlockKubernetesEventsFor(monitorID string) {
}
}

func (hc *HookController) StopMonitors() {
func (hc *HookController) UpdateMonitor(monitorId string, kind, apiVersion string) error {
if hc.KubernetesController != nil {
hc.KubernetesController.StopMonitors()
return hc.KubernetesController.UpdateMonitor(monitorId, kind, apiVersion)
}
return nil
}

func (hc *HookController) UpdateMonitor(monitorId string, kind, apiVersion string) error {
func (hc *HookController) EnableKubernetesBindings() ([]BindingExecutionInfo, error) {
if hc.KubernetesController != nil {
return hc.KubernetesController.UpdateMonitor(monitorId, kind, apiVersion)
return hc.KubernetesController.EnableKubernetesBindings()
}

return nil, nil
}

func (hc *HookController) DisableKubernetesBindings() {
if hc.KubernetesController != nil {
hc.KubernetesController.DisableKubernetesBindings()
}
return nil
}

func (hc *HookController) EnableScheduleBindings() {
Expand Down
49 changes: 31 additions & 18 deletions pkg/hook/controller/kubernetes_bindings_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/deckhouse/deckhouse/pkg/log"

pkg "github.com/flant/shell-operator/pkg"
"github.com/flant/shell-operator/pkg"
bctx "github.com/flant/shell-operator/pkg/hook/binding_context"
htypes "github.com/flant/shell-operator/pkg/hook/types"
kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager"
Expand All @@ -27,10 +27,10 @@ type KubernetesBindingsController interface {
WithKubernetesBindings([]htypes.OnKubernetesEventConfig)
WithKubeEventsManager(kubeeventsmanager.KubeEventsSource)
EnableKubernetesBindings() ([]BindingExecutionInfo, error)
DisableKubernetesBindings()
UpdateMonitor(monitorId string, kind, apiVersion string) error
UnlockEvents()
UnlockEventsFor(monitorID string)
StopMonitors()
CanHandleEvent(kubeEvent kemtypes.KubeEvent) bool
HandleEvent(ctx context.Context, kubeEvent kemtypes.KubeEvent) BindingExecutionInfo
BindingNames() []string
Expand Down Expand Up @@ -82,17 +82,25 @@ func (c *kubernetesBindingsController) WithKubeEventsManager(kubeEventsManager k
func (c *kubernetesBindingsController) EnableKubernetesBindings() ([]BindingExecutionInfo, error) {
res := make([]BindingExecutionInfo, 0)

c.l.RLock()
alreadyEnabled := len(c.BindingMonitorLinks) == len(c.KubernetesBindings)
c.l.RUnlock()
if alreadyEnabled {
return res, nil
}

for _, config := range c.KubernetesBindings {
err := c.kubeEventsManager.AddMonitor(config.Monitor)
if err != nil {
return nil, fmt.Errorf("run monitor: %s", err)
if _, found := c.getBindingMonitorLinksById(config.Monitor.Metadata.MonitorId); !found {
if err := c.kubeEventsManager.AddMonitor(config.Monitor); err != nil {
return nil, fmt.Errorf("run monitor: %s", err)
}
c.setBindingMonitorLinks(config.Monitor.Metadata.MonitorId, &KubernetesBindingToMonitorLink{
MonitorId: config.Monitor.Metadata.MonitorId,
BindingConfig: config,
})
// Start monitor's informers to fill the cache.
c.kubeEventsManager.StartMonitor(config.Monitor.Metadata.MonitorId)
}
c.setBindingMonitorLinks(config.Monitor.Metadata.MonitorId, &KubernetesBindingToMonitorLink{
MonitorId: config.Monitor.Metadata.MonitorId,
BindingConfig: config,
})
// Start monitor's informers to fill the cache.
c.kubeEventsManager.StartMonitor(config.Monitor.Metadata.MonitorId)

synchronizationInfo := c.HandleEvent(context.TODO(), kemtypes.KubeEvent{
MonitorId: config.Monitor.Metadata.MonitorId,
Expand Down Expand Up @@ -173,13 +181,18 @@ func (c *kubernetesBindingsController) UnlockEventsFor(monitorID string) {
m.EnableKubeEventCb()
}

// StopMonitors stops all monitors for the hook.
// TODO handle error!
func (c *kubernetesBindingsController) StopMonitors() {
c.iterateBindingMonitorLinks(func(monitorID string) bool {
_ = c.kubeEventsManager.StopMonitor(monitorID)
return false
})
func (c *kubernetesBindingsController) DisableKubernetesBindings() {
c.l.Lock()
ids := make([]string, 0, len(c.BindingMonitorLinks))
for id := range c.BindingMonitorLinks {
ids = append(ids, id)
}
c.BindingMonitorLinks = make(map[string]*KubernetesBindingToMonitorLink)
c.l.Unlock()

for _, id := range ids {
_ = c.kubeEventsManager.StopMonitor(id)
}
}

func (c *kubernetesBindingsController) CanHandleEvent(kubeEvent kemtypes.KubeEvent) bool {
Expand Down
12 changes: 5 additions & 7 deletions pkg/kube_events_manager/kube_events_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage"

klient "github.com/flant/kube-client/client"
pkg "github.com/flant/shell-operator/pkg"
"github.com/flant/shell-operator/pkg"
kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types"
)

Expand Down Expand Up @@ -92,9 +92,8 @@ func (mgr *kubeEventsManager) WithMetricStorage(mstor metricsstorage.Storage) {
// TODO cleanup informers in case of error
// TODO use Context to stop informers
func (mgr *kubeEventsManager) AddMonitor(monitorConfig *MonitorConfig) error {
log.Debug("Add MONITOR",
slog.String(pkg.LogKeyConfig, fmt.Sprintf("%+v", monitorConfig)))
monitor := NewMonitor(
mgr.logger.Debug("add kubernetes monitor", slog.String(pkg.LogKeyConfig, fmt.Sprintf("%+v", monitorConfig)))
mon := NewMonitor(
mgr.ctx,
mgr.KubeClient,
mgr.metricStorage,
Expand All @@ -107,13 +106,12 @@ func (mgr *kubeEventsManager) AddMonitor(monitorConfig *MonitorConfig) error {
mgr.logger.Named("monitor"),
)

err := monitor.CreateInformers()
if err != nil {
if err := mon.CreateInformers(); err != nil {
return err
}

mgr.m.Lock()
mgr.Monitors[monitorConfig.Metadata.MonitorId] = monitor
mgr.Monitors[monitorConfig.Metadata.MonitorId] = mon
mgr.m.Unlock()

return nil
Expand Down
2 changes: 1 addition & 1 deletion test/hook/context/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,6 @@ func (b *BindingContextController) RunBindingWithAllSnapshots(binding types.Bind

func (b *BindingContextController) Stop() {
if b.HookCtrl != nil {
b.HookCtrl.StopMonitors()
b.HookCtrl.DisableKubernetesBindings()
}
}
Loading