Skip to content
Draft
3 changes: 2 additions & 1 deletion comp/core/autodiscovery/autodiscoveryimpl/autoconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

api "github.com/DataDog/datadog-agent/comp/api/api/def"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/discovery"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/listeners"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers"
Expand Down Expand Up @@ -201,7 +202,7 @@ func createNewAutoConfig(schedulerController *scheduler.Controller, secretResolv
if h, ok := hp.Get(); ok {
hpComp = h
}
cfgMgr := newReconcilingConfigManager(secretResolver, hpComp)
cfgMgr := newReconcilingConfigManager(secretResolver, hpComp, discovery.NewOpenMetricsProber())
ac := &AutoConfig{
configPollers: make([]*configPoller, 0, 9),
listenerCandidates: make(map[string]*listenerCandidate),
Expand Down
28 changes: 26 additions & 2 deletions comp/core/autodiscovery/autodiscoveryimpl/configmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package autodiscoveryimpl

import (
"context"
"fmt"
"maps"
"sync"

healthplatformpayload "github.com/DataDog/agent-payload/v5/healthplatform"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/configresolver"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/discovery"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/listeners"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/names"
Expand Down Expand Up @@ -110,12 +112,14 @@ type reconcilingConfigManager struct {

secretResolver secrets.Component
healthPlatform healthplatformdef.Component
prober discovery.Prober
}

var _ configManager = &reconcilingConfigManager{}

// newReconcilingConfigManager creates a new, empty reconcilingConfigManager.
func newReconcilingConfigManager(secretResolver secrets.Component, healthPlatform healthplatformdef.Component) configManager {
// prober may be nil; templates with Discovery set will be skipped if so.
func newReconcilingConfigManager(secretResolver secrets.Component, healthPlatform healthplatformdef.Component, prober discovery.Prober) configManager {
return &reconcilingConfigManager{
activeConfigs: map[string]integration.Config{},
activeServices: map[string]serviceAndADIDs{},
Expand All @@ -125,6 +129,7 @@ func newReconcilingConfigManager(secretResolver secrets.Component, healthPlatfor
scheduledConfigs: map[string]integration.Config{},
secretResolver: secretResolver,
healthPlatform: healthPlatform,
prober: prober,
}
}

Expand Down Expand Up @@ -408,7 +413,26 @@ func (cm *reconcilingConfigManager) reconcileService(svcID string) integration.C
// returns false.
func (cm *reconcilingConfigManager) resolveTemplateForService(tpl integration.Config, svc listeners.Service) (integration.Config, bool) {
digest := tpl.Digest()
config, err := configresolver.Resolve(tpl, svc)
resolvedSvc := svc

if tpl.Discovery != nil {
if cm.prober == nil {
msg := fmt.Sprintf("template %s has Discovery set but no prober is configured", tpl.Name)
log.Errorf("autodiscovery: %s", msg)
errorStats.setResolveWarning(tpl.Name, msg)
return tpl, false
}
result, ok := cm.prober.Probe(context.Background(), tpl.Discovery, svc)
if !ok {
msg := fmt.Sprintf("discovery probe did not match for template %s and service %s", tpl.Name, svc.GetServiceID())
log.Debugf("autodiscovery: %s", msg)
errorStats.setResolveWarning(tpl.Name, msg)
return tpl, false
}
resolvedSvc = discovery.WrapWithProbeResult(svc, result)
}

config, err := configresolver.Resolve(tpl, resolvedSvc)
if err != nil {
msg := fmt.Sprintf("error resolving template %s for service %s: %v", tpl.Name, svc.GetServiceID(), err)
log.Errorf("autodiscovery: skipping config - %s", msg)
Expand Down
6 changes: 3 additions & 3 deletions comp/core/autodiscovery/autodiscoveryimpl/configmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func TestReconcilingConfigManagement(t *testing.T) {
mockResolver := MockSecretResolver{}
suite.Run(t, &ReconcilingConfigManagerSuite{
ConfigManagerSuite{factory: func() configManager {
return newReconcilingConfigManager(&mockResolver, nil)
return newReconcilingConfigManager(&mockResolver, nil, nil)
}},
})
}
Expand All @@ -563,7 +563,7 @@ func TestResolveTemplateForService_ReportsToHealthPlatform(t *testing.T) {
mockResolver := MockSecretResolver{}
hp := healthplatformmock.Mock(t)

cm := newReconcilingConfigManager(&mockResolver, hp).(*reconcilingConfigManager)
cm := newReconcilingConfigManager(&mockResolver, hp, nil).(*reconcilingConfigManager)

tpl := integration.Config{
Name: "postgres",
Expand Down Expand Up @@ -596,7 +596,7 @@ func TestResolveTemplateForService_ClearsHealthPlatformOnSuccess(t *testing.T) {
mockResolver := MockSecretResolver{}
hp := healthplatformmock.Mock(t)

cm := newReconcilingConfigManager(&mockResolver, hp).(*reconcilingConfigManager)
cm := newReconcilingConfigManager(&mockResolver, hp, nil).(*reconcilingConfigManager)

tpl := integration.Config{
Name: "redis",
Expand Down
60 changes: 60 additions & 0 deletions comp/core/autodiscovery/discovery/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package discovery

import (
"sync"
"time"
)

type cacheEntry struct {
result ProbeResult
success bool
expiresAt time.Time // zero = never
}

type probeCache struct {
mu sync.Mutex
entries map[string]cacheEntry
now func() time.Time
}

func newProbeCache(now func() time.Time) *probeCache {
if now == nil {
now = time.Now
}
return &probeCache{entries: make(map[string]cacheEntry), now: now}
}

func cacheKey(svcID, cfgHash string) string {
return svcID + "|" + cfgHash
}

func (c *probeCache) get(svcID, cfgHash string) (ProbeResult, bool, bool) {
c.mu.Lock()
defer c.mu.Unlock()
e, ok := c.entries[cacheKey(svcID, cfgHash)]
if !ok {
return ProbeResult{}, false, false
}
if !e.expiresAt.IsZero() && c.now().After(e.expiresAt) {
delete(c.entries, cacheKey(svcID, cfgHash))
return ProbeResult{}, false, false
}
return e.result, e.success, true
}

func (c *probeCache) putSuccess(svcID, cfgHash string, r ProbeResult) {
c.mu.Lock()
defer c.mu.Unlock()
c.entries[cacheKey(svcID, cfgHash)] = cacheEntry{result: r, success: true}
}

func (c *probeCache) putFailure(svcID, cfgHash string, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.entries[cacheKey(svcID, cfgHash)] = cacheEntry{success: false, expiresAt: c.now().Add(ttl)}
}
51 changes: 51 additions & 0 deletions comp/core/autodiscovery/discovery/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package discovery

import (
"testing"
"time"
)

func TestProbeCache_HitAndExpiry(t *testing.T) {
now := time.Unix(1_700_000_000, 0)
clock := func() time.Time { return now }
c := newProbeCache(clock)

// Empty cache — miss.
if _, _, ok := c.get("svc1", "h1"); ok {
t.Fatal("expected miss on empty cache")
}

// Successful probe entry, never expires.
c.putSuccess("svc1", "h1", ProbeResult{Port: 8090})
if r, success, ok := c.get("svc1", "h1"); !ok || !success || r.Port != 8090 {
t.Fatalf("expected hit success(8090); got ok=%v success=%v port=%d", ok, success, r.Port)
}

// Failed probe entry, expires after 30s.
c.putFailure("svc1", "h2", 30*time.Second)
if _, success, ok := c.get("svc1", "h2"); !ok || success {
t.Fatal("expected hit failure")
}
now = now.Add(31 * time.Second)
if _, _, ok := c.get("svc1", "h2"); ok {
t.Fatal("expected miss after expiry")
}
}

func TestProbeCache_DifferentKeysIsolated(t *testing.T) {
now := time.Unix(0, 0)
c := newProbeCache(func() time.Time { return now })
c.putSuccess("svcA", "h1", ProbeResult{Port: 1})
c.putSuccess("svcB", "h1", ProbeResult{Port: 2})
if r, _, _ := c.get("svcA", "h1"); r.Port != 1 {
t.Fatalf("svcA: got %d", r.Port)
}
if r, _, _ := c.get("svcB", "h1"); r.Port != 2 {
t.Fatalf("svcB: got %d", r.Port)
}
}
43 changes: 43 additions & 0 deletions comp/core/autodiscovery/discovery/candidates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package discovery

import (
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
)

func candidatePorts(hints []int, exposed []workloadmeta.ContainerPort) []uint16 {
exposedSet := make(map[uint16]struct{}, len(exposed))
for _, p := range exposed {
exposedSet[uint16(p.Port)] = struct{}{}
}

out := make([]uint16, 0, len(exposed))
seen := make(map[uint16]struct{}, len(exposed))

for _, h := range hints {
p := uint16(h)
if _, ok := exposedSet[p]; !ok {
continue
}
if _, dup := seen[p]; dup {
continue
}
out = append(out, p)
seen[p] = struct{}{}
}

for _, p := range exposed {
port := uint16(p.Port)
if _, dup := seen[port]; dup {
continue
}
out = append(out, port)
seen[port] = struct{}{}
}

return out
}
42 changes: 42 additions & 0 deletions comp/core/autodiscovery/discovery/candidates_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package discovery

import (
"reflect"
"testing"

workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
)

func TestCandidatePorts(t *testing.T) {
exposed := []workloadmeta.ContainerPort{{Port: 9000}, {Port: 8090}, {Port: 9001}}

tests := []struct {
name string
hints []int
want []uint16
}{
{"no hints — fallback only", nil, []uint16{9000, 8090, 9001}},
{"hint matches one exposed", []int{8090}, []uint16{8090, 9000, 9001}},
{"hint not exposed is dropped", []int{1234}, []uint16{9000, 8090, 9001}},
{"two hints, declared order preserved", []int{8090, 9000}, []uint16{8090, 9000, 9001}},
{"empty exposed yields empty", nil, []uint16{}},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ex := exposed
if tc.name == "empty exposed yields empty" {
ex = nil
}
got := candidatePorts(tc.hints, ex)
if !reflect.DeepEqual(got, tc.want) {
t.Fatalf("got %+v want %+v", got, tc.want)
}
})
}
}
Loading
Loading