diff --git a/comp/core/autodiscovery/autodiscoveryimpl/autoconfig.go b/comp/core/autodiscovery/autodiscoveryimpl/autoconfig.go index d761560da8d6..d900f460db2f 100644 --- a/comp/core/autodiscovery/autodiscoveryimpl/autoconfig.go +++ b/comp/core/autodiscovery/autodiscoveryimpl/autoconfig.go @@ -27,6 +27,7 @@ import ( api "github.com/DataDog/datadog-agent/comp/api/api/def" "github.com/DataDog/datadog-agent/comp/core/autodiscovery" + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/discovery" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/listeners" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers" @@ -201,7 +202,7 @@ func createNewAutoConfig(schedulerController *scheduler.Controller, secretResolv if h, ok := hp.Get(); ok { hpComp = h } - cfgMgr := newReconcilingConfigManager(secretResolver, hpComp) + cfgMgr := newReconcilingConfigManager(secretResolver, hpComp, discovery.NewOpenMetricsProber()) ac := &AutoConfig{ configPollers: make([]*configPoller, 0, 9), listenerCandidates: make(map[string]*listenerCandidate), diff --git a/comp/core/autodiscovery/autodiscoveryimpl/configmgr.go b/comp/core/autodiscovery/autodiscoveryimpl/configmgr.go index 41d65fd6eb7d..2857b0191177 100644 --- a/comp/core/autodiscovery/autodiscoveryimpl/configmgr.go +++ b/comp/core/autodiscovery/autodiscoveryimpl/configmgr.go @@ -6,6 +6,7 @@ package autodiscoveryimpl import ( + "context" "fmt" "maps" "sync" @@ -13,6 +14,7 @@ import ( healthplatformpayload "github.com/DataDog/agent-payload/v5/healthplatform" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/configresolver" + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/discovery" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/listeners" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/names" @@ -110,12 +112,14 @@ type reconcilingConfigManager struct { secretResolver secrets.Component healthPlatform healthplatformdef.Component + prober discovery.Prober } var _ configManager = &reconcilingConfigManager{} // newReconcilingConfigManager creates a new, empty reconcilingConfigManager. -func newReconcilingConfigManager(secretResolver secrets.Component, healthPlatform healthplatformdef.Component) configManager { +// prober may be nil; templates with Discovery set will be skipped if so. +func newReconcilingConfigManager(secretResolver secrets.Component, healthPlatform healthplatformdef.Component, prober discovery.Prober) configManager { return &reconcilingConfigManager{ activeConfigs: map[string]integration.Config{}, activeServices: map[string]serviceAndADIDs{}, @@ -125,6 +129,7 @@ func newReconcilingConfigManager(secretResolver secrets.Component, healthPlatfor scheduledConfigs: map[string]integration.Config{}, secretResolver: secretResolver, healthPlatform: healthPlatform, + prober: prober, } } @@ -408,7 +413,26 @@ func (cm *reconcilingConfigManager) reconcileService(svcID string) integration.C // returns false. func (cm *reconcilingConfigManager) resolveTemplateForService(tpl integration.Config, svc listeners.Service) (integration.Config, bool) { digest := tpl.Digest() - config, err := configresolver.Resolve(tpl, svc) + resolvedSvc := svc + + if tpl.Discovery != nil { + if cm.prober == nil { + msg := fmt.Sprintf("template %s has Discovery set but no prober is configured", tpl.Name) + log.Errorf("autodiscovery: %s", msg) + errorStats.setResolveWarning(tpl.Name, msg) + return tpl, false + } + result, ok := cm.prober.Probe(context.Background(), tpl.Discovery, svc) + if !ok { + msg := fmt.Sprintf("discovery probe did not match for template %s and service %s", tpl.Name, svc.GetServiceID()) + log.Debugf("autodiscovery: %s", msg) + errorStats.setResolveWarning(tpl.Name, msg) + return tpl, false + } + resolvedSvc = discovery.WrapWithProbeResult(svc, result) + } + + config, err := configresolver.Resolve(tpl, resolvedSvc) if err != nil { msg := fmt.Sprintf("error resolving template %s for service %s: %v", tpl.Name, svc.GetServiceID(), err) log.Errorf("autodiscovery: skipping config - %s", msg) diff --git a/comp/core/autodiscovery/autodiscoveryimpl/configmgr_test.go b/comp/core/autodiscovery/autodiscoveryimpl/configmgr_test.go index 68d80962e9f8..365d1884ce59 100644 --- a/comp/core/autodiscovery/autodiscoveryimpl/configmgr_test.go +++ b/comp/core/autodiscovery/autodiscoveryimpl/configmgr_test.go @@ -545,7 +545,7 @@ func TestReconcilingConfigManagement(t *testing.T) { mockResolver := MockSecretResolver{} suite.Run(t, &ReconcilingConfigManagerSuite{ ConfigManagerSuite{factory: func() configManager { - return newReconcilingConfigManager(&mockResolver, nil) + return newReconcilingConfigManager(&mockResolver, nil, nil) }}, }) } @@ -563,7 +563,7 @@ func TestResolveTemplateForService_ReportsToHealthPlatform(t *testing.T) { mockResolver := MockSecretResolver{} hp := healthplatformmock.Mock(t) - cm := newReconcilingConfigManager(&mockResolver, hp).(*reconcilingConfigManager) + cm := newReconcilingConfigManager(&mockResolver, hp, nil).(*reconcilingConfigManager) tpl := integration.Config{ Name: "postgres", @@ -596,7 +596,7 @@ func TestResolveTemplateForService_ClearsHealthPlatformOnSuccess(t *testing.T) { mockResolver := MockSecretResolver{} hp := healthplatformmock.Mock(t) - cm := newReconcilingConfigManager(&mockResolver, hp).(*reconcilingConfigManager) + cm := newReconcilingConfigManager(&mockResolver, hp, nil).(*reconcilingConfigManager) tpl := integration.Config{ Name: "redis", diff --git a/comp/core/autodiscovery/discovery/cache.go b/comp/core/autodiscovery/discovery/cache.go new file mode 100644 index 000000000000..474a95830c2e --- /dev/null +++ b/comp/core/autodiscovery/discovery/cache.go @@ -0,0 +1,60 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package discovery + +import ( + "sync" + "time" +) + +type cacheEntry struct { + result ProbeResult + success bool + expiresAt time.Time // zero = never +} + +type probeCache struct { + mu sync.Mutex + entries map[string]cacheEntry + now func() time.Time +} + +func newProbeCache(now func() time.Time) *probeCache { + if now == nil { + now = time.Now + } + return &probeCache{entries: make(map[string]cacheEntry), now: now} +} + +func cacheKey(svcID, cfgHash string) string { + return svcID + "|" + cfgHash +} + +func (c *probeCache) get(svcID, cfgHash string) (ProbeResult, bool, bool) { + c.mu.Lock() + defer c.mu.Unlock() + e, ok := c.entries[cacheKey(svcID, cfgHash)] + if !ok { + return ProbeResult{}, false, false + } + if !e.expiresAt.IsZero() && c.now().After(e.expiresAt) { + delete(c.entries, cacheKey(svcID, cfgHash)) + return ProbeResult{}, false, false + } + return e.result, e.success, true +} + +func (c *probeCache) putSuccess(svcID, cfgHash string, r ProbeResult) { + c.mu.Lock() + defer c.mu.Unlock() + c.entries[cacheKey(svcID, cfgHash)] = cacheEntry{result: r, success: true} +} + +func (c *probeCache) putFailure(svcID, cfgHash string, ttl time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.entries[cacheKey(svcID, cfgHash)] = cacheEntry{success: false, expiresAt: c.now().Add(ttl)} +} diff --git a/comp/core/autodiscovery/discovery/cache_test.go b/comp/core/autodiscovery/discovery/cache_test.go new file mode 100644 index 000000000000..e2f5b63e413d --- /dev/null +++ b/comp/core/autodiscovery/discovery/cache_test.go @@ -0,0 +1,51 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package discovery + +import ( + "testing" + "time" +) + +func TestProbeCache_HitAndExpiry(t *testing.T) { + now := time.Unix(1_700_000_000, 0) + clock := func() time.Time { return now } + c := newProbeCache(clock) + + // Empty cache — miss. + if _, _, ok := c.get("svc1", "h1"); ok { + t.Fatal("expected miss on empty cache") + } + + // Successful probe entry, never expires. + c.putSuccess("svc1", "h1", ProbeResult{Port: 8090}) + if r, success, ok := c.get("svc1", "h1"); !ok || !success || r.Port != 8090 { + t.Fatalf("expected hit success(8090); got ok=%v success=%v port=%d", ok, success, r.Port) + } + + // Failed probe entry, expires after 30s. + c.putFailure("svc1", "h2", 30*time.Second) + if _, success, ok := c.get("svc1", "h2"); !ok || success { + t.Fatal("expected hit failure") + } + now = now.Add(31 * time.Second) + if _, _, ok := c.get("svc1", "h2"); ok { + t.Fatal("expected miss after expiry") + } +} + +func TestProbeCache_DifferentKeysIsolated(t *testing.T) { + now := time.Unix(0, 0) + c := newProbeCache(func() time.Time { return now }) + c.putSuccess("svcA", "h1", ProbeResult{Port: 1}) + c.putSuccess("svcB", "h1", ProbeResult{Port: 2}) + if r, _, _ := c.get("svcA", "h1"); r.Port != 1 { + t.Fatalf("svcA: got %d", r.Port) + } + if r, _, _ := c.get("svcB", "h1"); r.Port != 2 { + t.Fatalf("svcB: got %d", r.Port) + } +} diff --git a/comp/core/autodiscovery/discovery/candidates.go b/comp/core/autodiscovery/discovery/candidates.go new file mode 100644 index 000000000000..ac3287364b28 --- /dev/null +++ b/comp/core/autodiscovery/discovery/candidates.go @@ -0,0 +1,43 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package discovery + +import ( + workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" +) + +func candidatePorts(hints []int, exposed []workloadmeta.ContainerPort) []uint16 { + exposedSet := make(map[uint16]struct{}, len(exposed)) + for _, p := range exposed { + exposedSet[uint16(p.Port)] = struct{}{} + } + + out := make([]uint16, 0, len(exposed)) + seen := make(map[uint16]struct{}, len(exposed)) + + for _, h := range hints { + p := uint16(h) + if _, ok := exposedSet[p]; !ok { + continue + } + if _, dup := seen[p]; dup { + continue + } + out = append(out, p) + seen[p] = struct{}{} + } + + for _, p := range exposed { + port := uint16(p.Port) + if _, dup := seen[port]; dup { + continue + } + out = append(out, port) + seen[port] = struct{}{} + } + + return out +} diff --git a/comp/core/autodiscovery/discovery/candidates_test.go b/comp/core/autodiscovery/discovery/candidates_test.go new file mode 100644 index 000000000000..05d713ef5edf --- /dev/null +++ b/comp/core/autodiscovery/discovery/candidates_test.go @@ -0,0 +1,42 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package discovery + +import ( + "reflect" + "testing" + + workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" +) + +func TestCandidatePorts(t *testing.T) { + exposed := []workloadmeta.ContainerPort{{Port: 9000}, {Port: 8090}, {Port: 9001}} + + tests := []struct { + name string + hints []int + want []uint16 + }{ + {"no hints — fallback only", nil, []uint16{9000, 8090, 9001}}, + {"hint matches one exposed", []int{8090}, []uint16{8090, 9000, 9001}}, + {"hint not exposed is dropped", []int{1234}, []uint16{9000, 8090, 9001}}, + {"two hints, declared order preserved", []int{8090, 9000}, []uint16{8090, 9000, 9001}}, + {"empty exposed yields empty", nil, []uint16{}}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ex := exposed + if tc.name == "empty exposed yields empty" { + ex = nil + } + got := candidatePorts(tc.hints, ex) + if !reflect.DeepEqual(got, tc.want) { + t.Fatalf("got %+v want %+v", got, tc.want) + } + }) + } +} diff --git a/comp/core/autodiscovery/discovery/openmetrics_prober.go b/comp/core/autodiscovery/discovery/openmetrics_prober.go new file mode 100644 index 000000000000..f084c2d83dd2 --- /dev/null +++ b/comp/core/autodiscovery/discovery/openmetrics_prober.go @@ -0,0 +1,175 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package discovery + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "net" + "net/http" + "regexp" + "strconv" + "strings" + "time" + + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/listeners" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +const ( + defaultPath = "/metrics" + defaultPerProbe = 500 * time.Millisecond + defaultBudget = 2 * time.Second + defaultMaxAttempts = 8 + defaultFailureTTL = 30 * time.Second +) + +var promLineRe = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*(\{[^}]*\})?\s+\S+`) + +// OpenMetricsProberOption configures an OpenMetricsProber. +type OpenMetricsProberOption func(*openMetricsProber) + +// WithFailureTTL overrides the negative-cache TTL. +func WithFailureTTL(d time.Duration) OpenMetricsProberOption { + return func(p *openMetricsProber) { p.failureTTL = d } +} + +type openMetricsProber struct { + client *http.Client + cache *probeCache + perProbe time.Duration + totalBudget time.Duration + maxAttempts int + failureTTL time.Duration +} + +// NewOpenMetricsProber returns a Prober that verifies OpenMetrics endpoints. +func NewOpenMetricsProber(opts ...OpenMetricsProberOption) Prober { + p := &openMetricsProber{ + client: &http.Client{Transport: &http.Transport{DisableKeepAlives: true}}, + cache: newProbeCache(time.Now), + perProbe: defaultPerProbe, + totalBudget: defaultBudget, + maxAttempts: defaultMaxAttempts, + failureTTL: defaultFailureTTL, + } + for _, o := range opts { + o(p) + } + return p +} + +func (p *openMetricsProber) Probe(ctx context.Context, cfg *integration.DiscoveryConfig, svc listeners.Service) (ProbeResult, bool) { + if cfg == nil || cfg.Type != "openmetrics" { + return ProbeResult{}, false + } + host, ok := pickHost(svc) + if !ok { + log.Debugf("autodiscovery/discovery: %s has no host, skipping", svc.GetServiceID()) + return ProbeResult{}, false + } + exposed, err := svc.GetPorts() + if err != nil || len(exposed) == 0 { + return ProbeResult{}, false + } + + cfgHash := hashDiscoveryConfig(cfg) + if r, success, hit := p.cache.get(svc.GetServiceID(), cfgHash); hit { + return r, success + } + + path := cfg.Path + if path == "" { + path = defaultPath + } + candidates := candidatePorts(cfg.Ports, exposed) + deadline := time.Now().Add(p.totalBudget) + + attempts := 0 + for _, port := range candidates { + if attempts >= p.maxAttempts || time.Now().After(deadline) { + break + } + attempts++ + if p.tryPort(ctx, host, port, path) { + r := ProbeResult{Port: port} + p.cache.putSuccess(svc.GetServiceID(), cfgHash, r) + log.Infof("autodiscovery/discovery: probe matched %s:%d%s for %s", host, port, path, svc.GetServiceID()) + return r, true + } + } + + p.cache.putFailure(svc.GetServiceID(), cfgHash, p.failureTTL) + log.Debugf("autodiscovery/discovery: %d candidate(s) for %s did not match", len(candidates), svc.GetServiceID()) + return ProbeResult{}, false +} + +func (p *openMetricsProber) tryPort(ctx context.Context, host string, port uint16, path string) bool { + url := "http://" + net.JoinHostPort(host, strconv.Itoa(int(port))) + path + tctx, cancel := context.WithTimeout(ctx, p.perProbe) + defer cancel() + req, err := http.NewRequestWithContext(tctx, http.MethodGet, url, nil) + if err != nil { + return false + } + resp, err := p.client.Do(req) + if err != nil { + return false + } + defer resp.Body.Close() + body, err := io.ReadAll(io.LimitReader(resp.Body, 64*1024)) + if err != nil { + return false + } + return verifyOpenMetricsResponse(resp.StatusCode, resp.Header.Get("Content-Type"), body) +} + +func verifyOpenMetricsResponse(status int, contentType string, body []byte) bool { + if status != http.StatusOK { + return false + } + ct := strings.ToLower(contentType) + if !strings.HasPrefix(ct, "text/plain") && !strings.HasPrefix(ct, "application/openmetrics-text") { + return false + } + for _, line := range strings.Split(string(body), "\n") { + s := strings.TrimSpace(line) + if s == "" || strings.HasPrefix(s, "#") { + continue + } + return promLineRe.MatchString(s) + } + return false +} + +func pickHost(svc listeners.Service) (string, bool) { + hosts, err := svc.GetHosts() + if err != nil || len(hosts) == 0 { + return "", false + } + if h, ok := hosts["bridge"]; ok && h != "" { + return h, true + } + for _, h := range hosts { + if h != "" { + return h, true + } + } + return "", false +} + +func hashDiscoveryConfig(cfg *integration.DiscoveryConfig) string { + h := sha256.New() + fmt.Fprintf(h, "%s|%s|", cfg.Type, cfg.Path) + for _, p := range cfg.Ports { + fmt.Fprintf(h, "%d,", p) + } + return hex.EncodeToString(h.Sum(nil)) +} diff --git a/comp/core/autodiscovery/discovery/openmetrics_prober_test.go b/comp/core/autodiscovery/discovery/openmetrics_prober_test.go new file mode 100644 index 000000000000..754dbb51eff4 --- /dev/null +++ b/comp/core/autodiscovery/discovery/openmetrics_prober_test.go @@ -0,0 +1,137 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package discovery + +import ( + "context" + "fmt" + "net" + "net/http" + "net/http/httptest" + "strconv" + "testing" + "time" + + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/listeners" + workloadfilter "github.com/DataDog/datadog-agent/comp/core/workloadfilter/def" + workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" +) + +func TestVerifyOpenMetricsResponse(t *testing.T) { + cases := []struct { + name string + status int + contentType string + body string + want bool + }{ + {"prom-text", 200, "text/plain; version=0.0.4", "go_goroutines 5\n", true}, + {"openmetrics-text", 200, "application/openmetrics-text; version=1.0.0", "go_goroutines 5\n", true}, + {"json", 200, "application/json", `{"a":1}`, false}, + {"html", 200, "text/html", "", false}, + {"401", 401, "text/plain", "go_goroutines 5\n", false}, + {"prom-no-line", 200, "text/plain", "# HELP only\n# TYPE only\n", false}, + {"prom-with-labels", 200, "text/plain", `http_requests_total{code="200"} 1027` + "\n", true}, + {"prom-with-comments-first", 200, "text/plain", "# HELP foo bar\n# TYPE foo counter\nfoo 1\n", true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if got := verifyOpenMetricsResponse(tc.status, tc.contentType, []byte(tc.body)); got != tc.want { + t.Fatalf("got %v want %v", got, tc.want) + } + }) + } +} + +// fakeService implements listeners.Service minimally for prober tests. +type fakeService struct { + id string + hosts map[string]string + ports []workloadmeta.ContainerPort +} + +func (f *fakeService) GetServiceID() string { return f.id } +func (f *fakeService) GetADIdentifiers() []string { return []string{"krakend"} } +func (f *fakeService) GetHosts() (map[string]string, error) { return f.hosts, nil } +func (f *fakeService) GetPorts() ([]workloadmeta.ContainerPort, error) { + return f.ports, nil +} +func (f *fakeService) GetTags() ([]string, error) { return nil, nil } +func (f *fakeService) GetTagsWithCardinality(string) ([]string, error) { return nil, nil } +func (f *fakeService) GetPid() (int, error) { return 0, nil } +func (f *fakeService) GetHostname() (string, error) { return "", nil } +func (f *fakeService) IsReady() bool { return true } +func (f *fakeService) HasFilter(workloadfilter.Scope) bool { return false } +func (f *fakeService) GetExtraConfig(key string) (string, error) { + return "", fmt.Errorf("unknown extra config %q", key) +} +func (f *fakeService) FilterTemplates(map[string]integration.Config) {} +func (f *fakeService) GetImageName() string { return "krakend:test" } +func (f *fakeService) Equal(other listeners.Service) bool { + return f.id == other.GetServiceID() +} + +func TestProbe_HintMatchesFirst(t *testing.T) { + bad := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(404) + })) + defer bad.Close() + good := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + _, _ = w.Write([]byte("go_goroutines 5\n")) + })) + defer good.Close() + + badHost, badPortStr, _ := net.SplitHostPort(bad.Listener.Addr().String()) + goodHost, goodPortStr, _ := net.SplitHostPort(good.Listener.Addr().String()) + badPort, _ := strconv.Atoi(badPortStr) + goodPort, _ := strconv.Atoi(goodPortStr) + if badHost != goodHost { + t.Fatalf("test assumption: both servers on same host (got %s, %s)", badHost, goodHost) + } + + svc := &fakeService{ + id: "container_id://abc", + hosts: map[string]string{"bridge": badHost}, + ports: []workloadmeta.ContainerPort{{Port: badPort}, {Port: goodPort}}, + } + cfg := &integration.DiscoveryConfig{ + Type: "openmetrics", + Ports: []int{goodPort}, + Path: "/metrics", + } + + p := NewOpenMetricsProber(WithFailureTTL(time.Second)) + r, ok := p.Probe(context.Background(), cfg, svc) + if !ok { + t.Fatal("expected probe success") + } + if int(r.Port) != goodPort { + t.Fatalf("port: got %d want %d", r.Port, goodPort) + } +} + +func TestProbe_AllFailReturnsFalse(t *testing.T) { + bad := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(404) + })) + defer bad.Close() + host, portStr, _ := net.SplitHostPort(bad.Listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + + svc := &fakeService{ + id: "container_id://xyz", + hosts: map[string]string{"bridge": host}, + ports: []workloadmeta.ContainerPort{{Port: port}}, + } + cfg := &integration.DiscoveryConfig{Type: "openmetrics", Path: "/metrics"} + + p := NewOpenMetricsProber(WithFailureTTL(time.Second)) + if _, ok := p.Probe(context.Background(), cfg, svc); ok { + t.Fatal("expected probe failure") + } +} diff --git a/comp/core/autodiscovery/discovery/service_wrapper.go b/comp/core/autodiscovery/discovery/service_wrapper.go new file mode 100644 index 000000000000..9cc99b184a10 --- /dev/null +++ b/comp/core/autodiscovery/discovery/service_wrapper.go @@ -0,0 +1,31 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package discovery + +import ( + "strconv" + + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/listeners" +) + +// WrapWithProbeResult returns a Service that overlays ProbeResult-derived +// values on the underlying Service via GetExtraConfig. Today only +// "discovered_port" is exposed. +func WrapWithProbeResult(svc listeners.Service, r ProbeResult) listeners.Service { + return &serviceWithProbeResult{Service: svc, result: r} +} + +type serviceWithProbeResult struct { + listeners.Service + result ProbeResult +} + +func (s *serviceWithProbeResult) GetExtraConfig(key string) (string, error) { + if key == "discovered_port" { + return strconv.Itoa(int(s.result.Port)), nil + } + return s.Service.GetExtraConfig(key) +} diff --git a/comp/core/autodiscovery/discovery/service_wrapper_test.go b/comp/core/autodiscovery/discovery/service_wrapper_test.go new file mode 100644 index 000000000000..7349a7b80961 --- /dev/null +++ b/comp/core/autodiscovery/discovery/service_wrapper_test.go @@ -0,0 +1,25 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package discovery + +import "testing" + +func TestServiceWithProbeResult_GetExtraConfig(t *testing.T) { + base := &fakeService{id: "svc"} + w := WrapWithProbeResult(base, ProbeResult{Port: 8090}) + + v, err := w.GetExtraConfig("discovered_port") + if err != nil { + t.Fatalf("error: %v", err) + } + if v != "8090" { + t.Fatalf("got %q want 8090", v) + } + + if _, err := w.GetExtraConfig("unknown"); err == nil { + t.Fatal("expected error for unknown extra key") + } +} diff --git a/comp/core/autodiscovery/discovery/types.go b/comp/core/autodiscovery/discovery/types.go new file mode 100644 index 000000000000..93ff19ad9b19 --- /dev/null +++ b/comp/core/autodiscovery/discovery/types.go @@ -0,0 +1,30 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +// Package discovery implements probe-based "advanced auto-config" — running +// a verifying probe against a discovered Service to derive instance config +// values that cannot be expressed by template substitution alone. +package discovery + +import ( + "context" + + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/listeners" +) + +// ProbeResult is the outcome of a successful probe. +type ProbeResult struct { + // Port is the discovered TCP port that responded successfully to the + // probe. + Port uint16 +} + +// Prober probes a Service against a DiscoveryConfig and returns a result +// when one of the candidate (host, port, path) tuples verifies. If no +// candidate verifies within the budget, ok is false. +type Prober interface { + Probe(ctx context.Context, cfg *integration.DiscoveryConfig, svc listeners.Service) (result ProbeResult, ok bool) +} diff --git a/comp/core/autodiscovery/integration/config.go b/comp/core/autodiscovery/integration/config.go index 450b4239a0c1..b2ceaf096adb 100644 --- a/comp/core/autodiscovery/integration/config.go +++ b/comp/core/autodiscovery/integration/config.go @@ -119,6 +119,19 @@ type Config struct { // ImageName is the container image name if any ImageName string `json:"image_name"` // (include in digest: false) + + // Discovery, when non-nil, signals that this config is a discovery + // template: AutoDiscovery must run a probe against the matched service + // before substituting %%discovered_port%%. + Discovery *DiscoveryConfig `json:"discovery"` // (include in digest: true) +} + +// DiscoveryConfig describes how to probe a service to find its check +// endpoint. Currently only Type=="openmetrics" is supported. +type DiscoveryConfig struct { + Type string `yaml:"type" json:"type"` + Ports []int `yaml:"ports,omitempty" json:"ports,omitempty"` + Path string `yaml:"path,omitempty" json:"path,omitempty"` } // MatchingProgram is an interface for matching objects against filter rules. diff --git a/comp/core/autodiscovery/integration/config_test.go b/comp/core/autodiscovery/integration/config_test.go index be8f287d903b..5fd81ac25c99 100644 --- a/comp/core/autodiscovery/integration/config_test.go +++ b/comp/core/autodiscovery/integration/config_test.go @@ -301,3 +301,25 @@ func BenchmarkID(b *testing.B) { } result = id } + +func TestDiscoveryConfig_FieldsAndZeroValue(t *testing.T) { + var c Config + if c.Discovery != nil { + t.Fatalf("Discovery should default to nil, got %+v", c.Discovery) + } + + c.Discovery = &DiscoveryConfig{ + Type: "openmetrics", + Ports: []int{8090}, + Path: "/metrics", + } + if c.Discovery.Type != "openmetrics" { + t.Fatalf("Type round-trip failed: %s", c.Discovery.Type) + } + if got, want := len(c.Discovery.Ports), 1; got != want { + t.Fatalf("Ports length: got %d want %d", got, want) + } + if c.Discovery.Path != "/metrics" { + t.Fatalf("Path round-trip failed: %s", c.Discovery.Path) + } +} diff --git a/comp/core/autodiscovery/providers/config_reader.go b/comp/core/autodiscovery/providers/config_reader.go index 2071177cdbf5..36c07ac2c3c5 100644 --- a/comp/core/autodiscovery/providers/config_reader.go +++ b/comp/core/autodiscovery/providers/config_reader.go @@ -43,6 +43,7 @@ type configFormat struct { DockerImages []string `yaml:"docker_images,omitempty"` // Only imported for deprecation warning IgnoreAutodiscoveryTags bool `yaml:"ignore_autodiscovery_tags,omitempty"` // Use to ignore tags coming from autodiscovery CheckTagCardinality string `yaml:"check_tag_cardinality,omitempty"` // Use to set the tag cardinality override for the check + Discovery *integration.DiscoveryConfig `yaml:"discovery,omitempty"` } // ConfigFormatWrapper is a wrapper for the config format @@ -502,6 +503,9 @@ func GetIntegrationConfigFromFile(name, fpath string) (integration.Config, Confi // Copy check_tag_cardinality parameter conf.CheckTagCardinality = cf.CheckTagCardinality + // Copy discovery probe spec + conf.Discovery = cf.Discovery + // DockerImages entry was found: we ignore it if no ADIdentifiers has been found if len(cf.DockerImages) > 0 && len(cf.ADIdentifiers) == 0 { return conf, ConfigFormatWrapper{}, errors.New("the 'docker_images' section is deprecated, please use 'ad_identifiers' instead") diff --git a/comp/core/autodiscovery/providers/config_reader_test.go b/comp/core/autodiscovery/providers/config_reader_test.go index 3ff46331200b..f7e86b09a86e 100644 --- a/comp/core/autodiscovery/providers/config_reader_test.go +++ b/comp/core/autodiscovery/providers/config_reader_test.go @@ -121,13 +121,24 @@ func TestGetIntegrationConfig(t *testing.T) { assert.Empty(t, config.ServiceID) } +func TestGetIntegrationConfig_Discovery(t *testing.T) { + config, _, err := GetIntegrationConfigFromFile("krakend", "tests/auto_conf_discovery.yaml") + require.Nil(t, err) + require.NotNil(t, config.Discovery) + assert.Equal(t, "openmetrics", config.Discovery.Type) + assert.Equal(t, []int{8090}, config.Discovery.Ports) + assert.Equal(t, "/metrics", config.Discovery.Path) + assert.Equal(t, []string{"krakend"}, config.ADIdentifiers) + assert.Len(t, config.Instances, 1) +} + func TestReadConfigFiles(t *testing.T) { paths := []string{"tests"} ResetReader(paths) configs, errors, err := ReadConfigFiles(GetAll) require.Nil(t, err) - require.Equal(t, 21, len(configs)) + require.Equal(t, 22, len(configs)) require.Equal(t, 4, len(errors)) for _, c := range configs { @@ -138,7 +149,7 @@ func TestReadConfigFiles(t *testing.T) { configs, _, err = ReadConfigFiles(WithoutAdvancedAD) require.Nil(t, err) - require.Equal(t, 19, len(configs)) + require.Equal(t, 20, len(configs)) expectedConfig1 := integration.Config{ Name: "advanced_ad", diff --git a/comp/core/autodiscovery/providers/file_test.go b/comp/core/autodiscovery/providers/file_test.go index b2b57b761423..636aa6f6494b 100644 --- a/comp/core/autodiscovery/providers/file_test.go +++ b/comp/core/autodiscovery/providers/file_test.go @@ -82,7 +82,7 @@ func TestCollect(t *testing.T) { assert.Equal(t, 0, len(get("ignored"))) // total number of configurations found - assert.Equal(t, 18, len(configs)) + assert.Equal(t, 19, len(configs)) // incorrect configs get saved in the Errors map (invalid.yaml & notaconfig.yaml & ad_deprecated.yaml & null_instances.yml) assert.Equal(t, 4, len(provider.Errors)) diff --git a/comp/core/autodiscovery/providers/tests/auto_conf_discovery.yaml b/comp/core/autodiscovery/providers/tests/auto_conf_discovery.yaml new file mode 100644 index 000000000000..63092817bf5f --- /dev/null +++ b/comp/core/autodiscovery/providers/tests/auto_conf_discovery.yaml @@ -0,0 +1,9 @@ +ad_identifiers: + - krakend +discovery: + type: openmetrics + ports: [8090] + path: /metrics +init_config: +instances: + - openmetrics_endpoint: "http://%%host%%:%%discovered_port%%/metrics" diff --git a/pkg/util/tmplvar/resolver.go b/pkg/util/tmplvar/resolver.go index 7f4dcfcaa858..b1cb546fa3f9 100644 --- a/pkg/util/tmplvar/resolver.go +++ b/pkg/util/tmplvar/resolver.go @@ -94,12 +94,13 @@ type TemplateResolver struct { func NewTemplateResolver(parser Parser, postProcessor func(interface{}) error, supportEnvVars bool) *TemplateResolver { templateVariables := map[string]VariableGetter{ - "host": GetHost, - "pid": GetPid, - "port": GetPort, - "hostname": GetHostname, - "extra": GetAdditionalTplVariables, - "kube": GetAdditionalTplVariables, + "host": GetHost, + "pid": GetPid, + "port": GetPort, + "hostname": GetHostname, + "extra": GetAdditionalTplVariables, + "kube": GetAdditionalTplVariables, + "discovered": GetDiscoveredPort, } if supportEnvVars { templateVariables["env"] = GetEnvvar @@ -477,6 +478,23 @@ func GetAdditionalTplVariables(tplVar string, res Resolvable) (string, error) { return value, nil } +// GetDiscoveredPort resolves the %%discovered_port%% template variable. It is +// populated by the autodiscovery/discovery package when a probe matches a +// service. The value flows in via GetExtraConfig on a Service wrapper. +func GetDiscoveredPort(tplVar string, res Resolvable) (string, error) { + if res == nil { + return "", noResolverError("no resolver. %%%%discovered_port%%%% is not allowed") + } + if tplVar != "port" { + return "", noResolverError(fmt.Sprintf("unsupported %%discovered_%s%% variable; only %%discovered_port%% is recognised", tplVar)) + } + v, err := res.GetExtraConfig("discovered_port") + if err != nil || v == "" { + return "", noResolverError("discovered_port not available — autodiscovery probe did not run or did not match") + } + return v, nil +} + // GetEnvvar resolves the %%env_*%% template variable func GetEnvvar(envVar string, res Resolvable) (string, error) { if len(envVar) == 0 { diff --git a/pkg/util/tmplvar/resolver_test.go b/pkg/util/tmplvar/resolver_test.go index 9a8b66dd6ad6..db84cdeb8a1d 100644 --- a/pkg/util/tmplvar/resolver_test.go +++ b/pkg/util/tmplvar/resolver_test.go @@ -8,6 +8,7 @@ package tmplvar import ( "errors" "fmt" + "strings" "testing" workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def" @@ -298,6 +299,40 @@ func TestEnvVarTemplateVarsAlwaysResolveAsStrings(t *testing.T) { } } +func TestResolveDiscoveredPort(t *testing.T) { + res := &mockResolvable{ + extraConfig: map[string]string{ + "discovered_port": "8090", + }, + } + r := NewTemplateResolver(YAMLParser, nil, false) + out, err := r.ResolveDataWithTemplateVars([]byte(`url: "http://example:%%discovered_port%%/metrics"`+"\n"), res) + if err != nil { + t.Fatalf("err: %v", err) + } + if got, want := strings.TrimSpace(string(out)), `url: http://example:8090/metrics`; got != want { + t.Fatalf("got %q want %q", got, want) + } +} + +func TestResolveDiscoveredPort_MissingErrors(t *testing.T) { + res := &mockResolvable{} + r := NewTemplateResolver(YAMLParser, nil, false) + _, err := r.ResolveDataWithTemplateVars([]byte(`url: "http://example:%%discovered_port%%/metrics"`+"\n"), res) + if err == nil { + t.Fatal("expected error when discovered_port is unavailable") + } +} + +func TestGetDiscoveredPort_NilResolvable(t *testing.T) { + // SubstituteTemplateEnvVars passes nil at config-load time. Must not panic. + _, err := GetDiscoveredPort("port", nil) + var nre *NoResolverError + if err == nil || !errors.As(err, &nre) { + t.Fatalf("expected NoResolverError, got %v", err) + } +} + func TestGetFallbackHost(t *testing.T) { ip, err := getFallbackHost(map[string]string{"bridge": "172.17.0.1"}) assert.Equal(t, "172.17.0.1", ip)