From 2da44b243e4bca2edeeb3c42775a31acca6240be Mon Sep 17 00:00:00 2001 From: cuichen Date: Mon, 23 Mar 2026 16:05:15 +0800 Subject: [PATCH 01/14] add collector run hook --- .../domain/trace/entity/collector/collector.go | 14 +++++++++----- .../domain/trace/service/ingestion.go | 8 ++++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/backend/modules/observability/domain/trace/entity/collector/collector.go b/backend/modules/observability/domain/trace/entity/collector/collector.go index 177e74516..0b13423ec 100644 --- a/backend/modules/observability/domain/trace/entity/collector/collector.go +++ b/backend/modules/observability/domain/trace/entity/collector/collector.go @@ -135,8 +135,8 @@ func (col *Collector) WaitForReady() { } // 通常在异步线程中进行, 主线程需要等待初始化完成 -func (col *Collector) Run(ctx context.Context) error { - if err := col.setupConfigurationComponents(ctx); err != nil { +func (col *Collector) Run(ctx context.Context, hook func() error) error { + if err := col.setupConfigurationComponentsWithHook(ctx, hook); err != nil { return err } signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) @@ -151,8 +151,8 @@ func (col *Collector) Run(ctx context.Context) error { } // 同步阻塞执行 -func (col *Collector) RunInOne(ctx context.Context) error { - if err := col.setupConfigurationComponents(ctx); err != nil { +func (col *Collector) RunInOne(ctx context.Context, hook func() error) error { + if err := col.setupConfigurationComponentsWithHook(ctx, hook); err != nil { return err } signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) @@ -165,7 +165,7 @@ func (col *Collector) RunInOne(ctx context.Context) error { return col.shutdown(ctx) } -func (col *Collector) setupConfigurationComponents(ctx context.Context) error { +func (col *Collector) setupConfigurationComponentsWithHook(ctx context.Context, hook func() error) error { factories, err := col.set.Factories() if err != nil { return err @@ -195,6 +195,10 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return fmt.Errorf("failed to start tenant %q, %v", tenantName, err) } } + + if err = hook(); err != nil { + fmt.Printf("hook failed, %v\n", err) + } return nil } diff --git a/backend/modules/observability/domain/trace/service/ingestion.go b/backend/modules/observability/domain/trace/service/ingestion.go index f439d6cfa..2fe61ffc6 100644 --- a/backend/modules/observability/domain/trace/service/ingestion.go +++ b/backend/modules/observability/domain/trace/service/ingestion.go @@ -64,12 +64,16 @@ type IngestionServiceImpl struct { } func (i *IngestionServiceImpl) RunSync(ctx context.Context) error { - return i.c.RunInOne(ctx) + return i.c.RunInOne(ctx, func() error { + return nil + }) } func (i *IngestionServiceImpl) RunAsync(ctx context.Context) { go func() { - err := i.c.Run(ctx) + err := i.c.Run(ctx, func() error { + return nil + }) if err != nil { panic(err) } From d4e150b0a130dc3c9f143f3b645f66702a206203 Mon Sep 17 00:00:00 2001 From: cuichen Date: Mon, 23 Mar 2026 17:06:32 +0800 Subject: [PATCH 02/14] add GetDurationByTTL --- .../domain/trace/entity/loop_span/span.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/backend/modules/observability/domain/trace/entity/loop_span/span.go b/backend/modules/observability/domain/trace/entity/loop_span/span.go index 3642e6f5f..131682b32 100644 --- a/backend/modules/observability/domain/trace/entity/loop_span/span.go +++ b/backend/modules/observability/domain/trace/entity/loop_span/span.go @@ -96,6 +96,25 @@ const ( TTL365d TTL = "365d" ) +func GetDurationByTTL(ttl TTL) time.Duration { + switch ttl { + case TTL3d: + return time.Hour * 24 * 3 + case TTL7d: + return time.Hour * 24 * 7 + case TTL30d: + return time.Hour * 24 * 30 + case TTL90d: + return time.Hour * 24 * 90 + case TTL180d: + return time.Hour * 24 * 180 + case TTL365d: + return time.Hour * 24 * 365 + default: + return time.Hour * 24 * 3 + } +} + var TimeTagSlice = []string{ SpanFieldStartTimeFirstResp, SpanFieldLatencyFirstResp, From 2083ee793c6f75057618538870867612f542e976 Mon Sep 17 00:00:00 2001 From: cuichen Date: Mon, 23 Mar 2026 17:58:31 +0800 Subject: [PATCH 03/14] add CopySpan --- .../domain/trace/entity/loop_span/span.go | 150 ++++++++++++++++++ .../trace/entity/loop_span/span_test.go | 123 ++++++++++++++ 2 files changed, 273 insertions(+) diff --git a/backend/modules/observability/domain/trace/entity/loop_span/span.go b/backend/modules/observability/domain/trace/entity/loop_span/span.go index 131682b32..206f40d6b 100644 --- a/backend/modules/observability/domain/trace/entity/loop_span/span.go +++ b/backend/modules/observability/domain/trace/entity/loop_span/span.go @@ -1000,3 +1000,153 @@ func SizeofSpans(spans SpanList) int { func SizeOfString(s string) int { return len(s) } + +func CopySpans(spans []*Span) []*Span { + result := make([]*Span, 0, len(spans)) + for _, s := range spans { + result = append(result, CopySpan(s)) + } + return result +} + +func CopySpan(s *Span) *Span { + if s == nil { + return nil + } + return &Span{ + StartTime: s.StartTime, + SpanID: s.SpanID, + ParentID: s.ParentID, + TraceID: s.TraceID, + DurationMicros: s.DurationMicros, + CallType: s.CallType, + PSM: s.PSM, + LogID: s.LogID, + WorkspaceID: s.WorkspaceID, + SpanName: s.SpanName, + SpanType: s.SpanType, + Method: s.Method, + StatusCode: s.StatusCode, + Input: s.Input, + Output: s.Output, + ObjectStorage: s.ObjectStorage, + + SystemTagsString: copyMapStringString(s.SystemTagsString), + SystemTagsLong: copyMapStringInt64(s.SystemTagsLong), + SystemTagsDouble: copyMapStringFloat64(s.SystemTagsDouble), + + TagsString: copyMapStringString(s.TagsString), + TagsLong: copyMapStringInt64(s.TagsLong), + TagsDouble: copyMapStringFloat64(s.TagsDouble), + + TagsBool: copyMapStringBool(s.TagsBool), + TagsByte: copyMapStringString(s.TagsByte), + + AttrTos: copyAttrTos(s.AttrTos), + LogicDeleteTime: s.LogicDeleteTime, + Annotations: copyAnnotationList(s.Annotations), + Encryption: s.Encryption, + } +} + +func copyAttrTos(src *AttrTos) *AttrTos { + if src == nil { + return nil + } + return &AttrTos{ + InputDataURL: src.InputDataURL, + OutputDataURL: src.OutputDataURL, + MultimodalData: copyMapStringString(src.MultimodalData), + } +} + +func copyAnnotationList(src AnnotationList) AnnotationList { + if src == nil { + return nil + } + dst := make(AnnotationList, len(src)) + for i, a := range src { + dst[i] = copyAnnotation(a) + } + return dst +} + +func copyAnnotation(src *Annotation) *Annotation { + if src == nil { + return nil + } + var annotationIndex []string + if src.AnnotationIndex != nil { + annotationIndex = make([]string, len(src.AnnotationIndex)) + copy(annotationIndex, src.AnnotationIndex) + } + var corrections []AnnotationCorrection + if src.Corrections != nil { + corrections = make([]AnnotationCorrection, len(src.Corrections)) + copy(corrections, src.Corrections) + } + return &Annotation{ + ID: src.ID, + SpanID: src.SpanID, + TraceID: src.TraceID, + StartTime: src.StartTime, + WorkspaceID: src.WorkspaceID, + AnnotationType: src.AnnotationType, + AnnotationIndex: annotationIndex, + Key: src.Key, + Value: src.Value, + Reasoning: src.Reasoning, + Corrections: corrections, + Metadata: src.Metadata, + Status: src.Status, + CreatedAt: src.CreatedAt, + CreatedBy: src.CreatedBy, + UpdatedAt: src.UpdatedAt, + UpdatedBy: src.UpdatedBy, + IsDeleted: src.IsDeleted, + } +} + +func copyMapStringString(src map[string]string) map[string]string { + if src == nil { + return nil + } + dst := make(map[string]string, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func copyMapStringInt64(src map[string]int64) map[string]int64 { + if src == nil { + return nil + } + dst := make(map[string]int64, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func copyMapStringFloat64(src map[string]float64) map[string]float64 { + if src == nil { + return nil + } + dst := make(map[string]float64, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func copyMapStringBool(src map[string]bool) map[string]bool { + if src == nil { + return nil + } + dst := make(map[string]bool, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} diff --git a/backend/modules/observability/domain/trace/entity/loop_span/span_test.go b/backend/modules/observability/domain/trace/entity/loop_span/span_test.go index 28a570254..264726d4c 100644 --- a/backend/modules/observability/domain/trace/entity/loop_span/span_test.go +++ b/backend/modules/observability/domain/trace/entity/loop_span/span_test.go @@ -1160,3 +1160,126 @@ func TestEncryptionInfo(t *testing.T) { assert.False(t, span.Encryption.NeedWorkflow) }) } + +func TestCopySpans(t *testing.T) { + t.Parallel() + now := time.Now() + original := []*Span{ + { + StartTime: 1, + SpanID: "0000000000000001", + TraceID: "00000000000000000000000000000001", + ParentID: "0000000000000000", + DurationMicros: 2, + CallType: "call_type", + PSM: "psm", + LogID: "logid", + WorkspaceID: "space", + SpanName: "name", + SpanType: "type", + Method: "method", + StatusCode: 1, + Input: "input", + Output: "output", + ObjectStorage: "os", + SystemTagsString: map[string]string{ + "s1": "v1", + }, + SystemTagsLong: map[string]int64{ + "s2": 2, + }, + SystemTagsDouble: map[string]float64{ + "s3": 3, + }, + TagsString: map[string]string{ + "k": "v", + }, + TagsLong: map[string]int64{ + "k2": 2, + }, + TagsDouble: map[string]float64{ + "k3": 3, + }, + TagsBool: map[string]bool{ + "k4": true, + }, + TagsByte: map[string]string{ + "k5": "b", + }, + AttrTos: &AttrTos{ + InputDataURL: "in", + OutputDataURL: "out", + MultimodalData: map[string]string{ + "m": "mv", + }, + }, + Annotations: AnnotationList{ + { + ID: "anno1", + SpanID: "0000000000000001", + TraceID: "00000000000000000000000000000001", + StartTime: now, + WorkspaceID: "space", + AnnotationType: AnnotationTypeManualFeedback, + AnnotationIndex: []string{"i1"}, + Key: "key", + Value: NewStringValue("val"), + Reasoning: "r1", + Corrections: []AnnotationCorrection{ + { + Reasoning: "cr1", + Value: NewStringValue("cval"), + Type: AnnotationCorrectionTypeManual, + UpdateAt: now, + UpdatedBy: "u1", + }, + }, + Metadata: &ManualDatasetMetadata{}, + Status: AnnotationStatusNormal, + CreatedAt: now, + CreatedBy: "c1", + UpdatedAt: now, + UpdatedBy: "u1", + IsDeleted: false, + }, + nil, + }, + Encryption: EncryptionInfo{NeedWorkflow: true}, + }, + } + + copied := CopySpans(original) + assert.Len(t, copied, 1) + assert.NotSame(t, original[0], copied[0]) + + copied[0].TagsString["k"] = "changed" + assert.Equal(t, "v", original[0].TagsString["k"]) + + delete(copied[0].SystemTagsLong, "s2") + assert.Equal(t, int64(2), original[0].SystemTagsLong["s2"]) + + copied[0].AttrTos.MultimodalData["m"] = "changed" + assert.Equal(t, "mv", original[0].AttrTos.MultimodalData["m"]) + + assert.Len(t, copied[0].Annotations, 2) + assert.NotNil(t, copied[0].Annotations[0]) + assert.NotSame(t, original[0].Annotations[0], copied[0].Annotations[0]) + assert.Nil(t, copied[0].Annotations[1]) + + copied[0].Annotations[0].AnnotationIndex[0] = "changed" + assert.Equal(t, "i1", original[0].Annotations[0].AnnotationIndex[0]) + + copied[0].Annotations[0].Corrections[0].Reasoning = "changed" + assert.Equal(t, "cr1", original[0].Annotations[0].Corrections[0].Reasoning) + + spanWithNilMaps := &Span{ + SpanID: "0000000000000002", + TraceID: "00000000000000000000000000000002", + } + copiedNil := CopySpan(spanWithNilMaps) + assert.NotNil(t, copiedNil) + assert.Nil(t, copiedNil.TagsString) + assert.Nil(t, copiedNil.SystemTagsString) + assert.Nil(t, copiedNil.Annotations) + assert.Nil(t, copiedNil.AttrTos) +} From 9149ec3cb3c77413eed478051fa66ac085c06faa Mon Sep 17 00:00:00 2001 From: cuichen Date: Mon, 23 Mar 2026 20:49:35 +0800 Subject: [PATCH 04/14] add input/output object --- .../observability/domain/trace/entity/loop_span/span.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/modules/observability/domain/trace/entity/loop_span/span.go b/backend/modules/observability/domain/trace/entity/loop_span/span.go index 206f40d6b..447da31aa 100644 --- a/backend/modules/observability/domain/trace/entity/loop_span/span.go +++ b/backend/modules/observability/domain/trace/entity/loop_span/span.go @@ -144,6 +144,9 @@ type Span struct { Output string `json:"output"` ObjectStorage string `json:"object_storage"` + InputObject map[string]interface{} `json:"input_object"` + OutputObject map[string]interface{} `json:"output_object"` + SystemTagsString map[string]string `json:"system_tags_string"` SystemTagsLong map[string]int64 `json:"system_tags_long"` SystemTagsDouble map[string]float64 `json:"system_tags_double"` From 29e4d24f4d9dae6886045b7c938aefdc2a62b2e5 Mon Sep 17 00:00:00 2001 From: cuichen Date: Fri, 3 Apr 2026 17:39:20 +0800 Subject: [PATCH 05/14] feat(observability): add ObserveConsumer to auto-measure ConsumeTraces duration per node - Add ObserveConsumer decorator that wraps node consumers to measure self-only duration - Add stopwatchConsumer to track downstream elapsed time (total - downstream = self) - Integrate into Graph.buildComponents for Receiver, Processor, and Exporter nodes - Plumb ConsumeMetric through Collector -> Service -> Graph via Settings - Register trace_consume metric in observability infra metrics - Update wire DI to provide ConsumeMetric --- .../api/handler/coze/loop/apis/wire_gen.go | 3 +- .../modules/observability/application/wire.go | 2 + .../observability/application/wire_gen.go | 7 +- .../trace/entity/collector/collector.go | 3 + .../collector/consumer/observe_consumer.go | 88 +++++++++++++ .../consumer/observe_consumer_test.go | 116 ++++++++++++++++++ .../trace/entity/collector/service/graph.go | 70 ++++++++++- .../trace/entity/collector/service/service.go | 2 + .../domain/trace/service/ingestion.go | 3 + .../observability/infra/metrics/metrics.go | 24 ++++ 10 files changed, 309 insertions(+), 9 deletions(-) create mode 100644 backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go create mode 100644 backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go diff --git a/backend/api/handler/coze/loop/apis/wire_gen.go b/backend/api/handler/coze/loop/apis/wire_gen.go index a3e6a5f9b..03f17072e 100644 --- a/backend/api/handler/coze/loop/apis/wire_gen.go +++ b/backend/api/handler/coze/loop/apis/wire_gen.go @@ -8,7 +8,6 @@ package apis import ( "context" - "github.com/cloudwego/kitex/pkg/endpoint" "github.com/coze-dev/coze-loop/backend/infra/ck" "github.com/coze-dev/coze-loop/backend/infra/db" @@ -171,7 +170,7 @@ func InitObservabilityHandler(ctx context.Context, db2 db.Provider, ckDb ck.Prov if err != nil { return nil, err } - iTraceIngestionApplication, err := application6.InitTraceIngestionApplication(configFactory, storageProvider, ckDb, db2, mqFactory, persistentCmdable, idgen2) + iTraceIngestionApplication, err := application6.InitTraceIngestionApplication(configFactory, storageProvider, ckDb, db2, mqFactory, persistentCmdable, idgen2, meter) if err != nil { return nil, err } diff --git a/backend/modules/observability/application/wire.go b/backend/modules/observability/application/wire.go index 804961ccf..3496efa8e 100644 --- a/backend/modules/observability/application/wire.go +++ b/backend/modules/observability/application/wire.go @@ -135,6 +135,7 @@ var ( mq2.NewSpanWithAnnotationProducerImpl, redis2.NewSpansRedisDaoImpl, mysqldao.NewTrajectoryConfigDaoImpl, + obmetrics.NewConsumeMetric, ) openApiSet = wire.NewSet( NewOpenAPIApplication, @@ -452,6 +453,7 @@ func InitTraceIngestionApplication( mqFactory mq.IFactory, persistentCmdable redis.PersistentCmdable, idGenerator idgen.IIDGenerator, + meter metrics.Meter, ) (ITraceIngestionApplication, error) { wire.Build(traceIngestionSet) return nil, nil diff --git a/backend/modules/observability/application/wire_gen.go b/backend/modules/observability/application/wire_gen.go index 5b1ca3c57..f05d1043b 100644 --- a/backend/modules/observability/application/wire_gen.go +++ b/backend/modules/observability/application/wire_gen.go @@ -229,7 +229,7 @@ func InitMetricApplication(ckDb ck.Provider, storageProvider storage2.IStoragePr return iMetricApplication, nil } -func InitTraceIngestionApplication(configFactory conf.IConfigLoaderFactory, storageProvider storage2.IStorageProvider, ckDb ck.Provider, db2 db.Provider, mqFactory mq.IFactory, persistentCmdable redis.PersistentCmdable, idGenerator idgen.IIDGenerator) (ITraceIngestionApplication, error) { +func InitTraceIngestionApplication(configFactory conf.IConfigLoaderFactory, storageProvider storage2.IStorageProvider, ckDb ck.Provider, db2 db.Provider, mqFactory mq.IFactory, persistentCmdable redis.PersistentCmdable, idGenerator idgen.IIDGenerator, meter metrics.Meter) (ITraceIngestionApplication, error) { iConfigLoader, err := NewTraceConfigLoader(configFactory) if err != nil { return nil, err @@ -249,7 +249,8 @@ func InitTraceIngestionApplication(configFactory conf.IConfigLoaderFactory, stor return nil, err } ingestionCollectorFactory := NewIngestionCollectorFactory(mqFactory, iTraceRepo) - ingestionService, err := service.NewIngestionServiceImpl(iConfigLoader, ingestionCollectorFactory) + metric := metrics2.NewConsumeMetric(meter) + ingestionService, err := service.NewIngestionServiceImpl(iConfigLoader, ingestionCollectorFactory, metric) if err != nil { return nil, err } @@ -340,7 +341,7 @@ var ( ) traceIngestionSet = wire.NewSet( NewIngestionApplication, service.NewIngestionServiceImpl, provideTraceRepo, config.NewTraceConfigCenter, NewTraceConfigLoader, - NewIngestionCollectorFactory, producer.NewSpanWithAnnotationProducerImpl, redis2.NewSpansRedisDaoImpl, mysql.NewTrajectoryConfigDaoImpl, + NewIngestionCollectorFactory, producer.NewSpanWithAnnotationProducerImpl, redis2.NewSpansRedisDaoImpl, mysql.NewTrajectoryConfigDaoImpl, metrics2.NewConsumeMetric, ) openApiSet = wire.NewSet( NewOpenAPIApplication, auth.NewAuthProvider, traceDomainSet, time_range.NewTimeRangeProvider, diff --git a/backend/modules/observability/domain/trace/entity/collector/collector.go b/backend/modules/observability/domain/trace/entity/collector/collector.go index 0b13423ec..9051d2387 100644 --- a/backend/modules/observability/domain/trace/entity/collector/collector.go +++ b/backend/modules/observability/domain/trace/entity/collector/collector.go @@ -13,6 +13,7 @@ import ( "os/signal" "syscall" + "github.com/coze-dev/coze-loop/backend/infra/metrics" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/component" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/exporter" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/processor" @@ -109,6 +110,7 @@ func (cfg *Config) Validate() error { type Settings struct { Factories func() (Factories, error) ConfigProvider ConfigProvider + ConsumeMetric metrics.Metric } type Collector struct { @@ -183,6 +185,7 @@ func (col *Collector) setupConfigurationComponentsWithHook(ctx context.Context, ProcessorBuilder: processor.NewBuilder(cfg.Processors, factories.Processors), ExporterBuilder: exporter.NewBuilder(cfg.Exporters, factories.Exporters), PipelineConfig: tenantCfg, + ConsumeMetric: col.set.ConsumeMetric, }) if err != nil { return err diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go new file mode 100644 index 000000000..a8fadd3d3 --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go @@ -0,0 +1,88 @@ +package consumer + +import ( + "context" + "sync/atomic" + "time" + + "github.com/coze-dev/coze-loop/backend/infra/metrics" + "github.com/coze-dev/coze-loop/backend/pkg/logs" +) + +type ObserveConsumer struct { + name string + inner Consumer + nextElapsed *atomic.Int64 + metric metrics.Metric +} + +func NewObserveConsumer(name string, inner Consumer, nextElapsed *atomic.Int64, metric metrics.Metric) Consumer { + return &ObserveConsumer{ + name: name, + inner: inner, + nextElapsed: nextElapsed, + metric: metric, + } +} + +func (t *ObserveConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + if t.nextElapsed != nil { + t.nextElapsed.Store(0) + } + + start := time.Now() + err := t.inner.ConsumeTraces(ctx, tds) + total := time.Since(start) + + var selfDuration time.Duration + if t.nextElapsed != nil { + selfDuration = total - time.Duration(t.nextElapsed.Load()) + } else { + selfDuration = total + } + + isErr := err != nil + if t.metric != nil { + logs.CtxInfo(ctx, "ObserveConsumer[%s] ConsumeTraces, self_duration=%s, is_err=%s, spans_count=%d", t.name, selfDuration, boolToStr(isErr), tds.SpansCount()) + t.metric.Emit( + []metrics.T{ + {Name: "node", Value: t.name}, + {Name: "is_err", Value: boolToStr(isErr)}, + }, + metrics.Counter(1, metrics.WithSuffix("throughput")), + metrics.Timer(selfDuration.Microseconds(), metrics.WithSuffix("latency")), + metrics.Counter(int64(tds.SpansCount()), metrics.WithSuffix("spans")), + ) + } + + if err != nil { + logs.CtxWarn(ctx, "ObserveConsumer[%s] ConsumeTraces failed, self_duration=%s, err=%v", t.name, selfDuration, err) + } + return err +} + +type stopwatchConsumer struct { + inner Consumer + elapsed *atomic.Int64 +} + +func NewStopwatchConsumer(inner Consumer, elapsed *atomic.Int64) Consumer { + return &stopwatchConsumer{ + inner: inner, + elapsed: elapsed, + } +} + +func (s *stopwatchConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + start := time.Now() + err := s.inner.ConsumeTraces(ctx, tds) + s.elapsed.Add(time.Since(start).Nanoseconds()) + return err +} + +func boolToStr(b bool) string { + if b { + return "true" + } + return "false" +} diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go new file mode 100644 index 000000000..9fbea419c --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go @@ -0,0 +1,116 @@ +package consumer + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + metricsmocks "github.com/coze-dev/coze-loop/backend/infra/metrics/mocks" +) + +func TestObserveConsumer_ConsumeTraces_Success(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMetric := metricsmocks.NewMockMetric(ctrl) + mockMetric.EXPECT().Emit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + + inner := &mockConsumer{} + timed := NewObserveConsumer("test_node", inner, nil, mockMetric) + + err := timed.ConsumeTraces(context.Background(), Traces{}) + assert.NoError(t, err) +} + +func TestObserveConsumer_ConsumeTraces_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMetric := metricsmocks.NewMockMetric(ctrl) + mockMetric.EXPECT().Emit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + + expectedErr := errors.New("consume failed") + inner := &errConsumer{err: expectedErr} + timed := NewObserveConsumer("test_node", inner, nil, mockMetric) + + err := timed.ConsumeTraces(context.Background(), Traces{}) + assert.ErrorIs(t, err, expectedErr) +} + +func TestObserveConsumer_ConsumeTraces_NilMetric(t *testing.T) { + inner := &mockConsumer{} + timed := NewObserveConsumer("test_node", inner, nil, nil) + + err := timed.ConsumeTraces(context.Background(), Traces{}) + assert.NoError(t, err) +} + +func TestObserveConsumer_SubtractsNextElapsed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMetric := metricsmocks.NewMockMetric(ctrl) + mockMetric.EXPECT().Emit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) + + nextElapsed := &atomic.Int64{} + sleepDuration := 50 * time.Millisecond + + inner := &sleepConsumer{ + duration: sleepDuration, + afterSleep: func() { + nextElapsed.Store((100 * time.Millisecond).Nanoseconds()) + }, + } + timed := NewObserveConsumer("test_node", inner, nextElapsed, mockMetric) + + err := timed.ConsumeTraces(context.Background(), Traces{}) + assert.NoError(t, err) +} + +func TestStopwatchConsumer_RecordsElapsed(t *testing.T) { + elapsed := &atomic.Int64{} + inner := &sleepConsumer{duration: 10 * time.Millisecond} + sw := NewStopwatchConsumer(inner, elapsed) + + err := sw.ConsumeTraces(context.Background(), Traces{}) + assert.NoError(t, err) + assert.Greater(t, elapsed.Load(), int64(0)) +} + +func TestStopwatchConsumer_AccumulatesElapsed(t *testing.T) { + elapsed := &atomic.Int64{} + inner := &sleepConsumer{duration: 5 * time.Millisecond} + sw := NewStopwatchConsumer(inner, elapsed) + + _ = sw.ConsumeTraces(context.Background(), Traces{}) + first := elapsed.Load() + _ = sw.ConsumeTraces(context.Background(), Traces{}) + second := elapsed.Load() + assert.Greater(t, second, first) +} + +type errConsumer struct { + err error +} + +func (e *errConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + return e.err +} + +type sleepConsumer struct { + duration time.Duration + afterSleep func() +} + +func (s *sleepConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + time.Sleep(s.duration) + if s.afterSleep != nil { + s.afterSleep() + } + return nil +} diff --git a/backend/modules/observability/domain/trace/entity/collector/service/graph.go b/backend/modules/observability/domain/trace/entity/collector/service/graph.go index 24a1d3259..7381a5d7f 100644 --- a/backend/modules/observability/domain/trace/entity/collector/service/graph.go +++ b/backend/modules/observability/domain/trace/entity/collector/service/graph.go @@ -11,10 +11,12 @@ import ( "fmt" "hash/fnv" "strings" + "sync/atomic" "gonum.org/v1/gonum/graph/simple" "gonum.org/v1/gonum/graph/topo" + "github.com/coze-dev/coze-loop/backend/infra/metrics" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/component" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/consumer" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/exporter" @@ -76,8 +78,9 @@ func (n *receiverNode) getConsumer() consumer.BaseConsumer { type processorNode struct { nodeID - componentID component.ID + componentID component.ID component.Component + overrideConsumer consumer.Consumer } func newProcessorNode(procID component.ID) *processorNode { @@ -100,13 +103,17 @@ func (n *processorNode) buildComponent(ctx context.Context, builder *processor.B } func (n *processorNode) getConsumer() consumer.BaseConsumer { + if n.overrideConsumer != nil { + return n.overrideConsumer + } return n.Component.(consumer.BaseConsumer) } type exporterNode struct { nodeID - componentID component.ID + componentID component.ID component.Component + overrideConsumer consumer.Consumer } func newExporterNode(exprID component.ID) *exporterNode { @@ -129,6 +136,9 @@ func (n *exporterNode) buildComponent(ctx context.Context, builder *exporter.Bui } func (n *exporterNode) getConsumer() consumer.BaseConsumer { + if n.overrideConsumer != nil { + return n.overrideConsumer + } return n.Component.(consumer.BaseConsumer) } @@ -166,6 +176,7 @@ type pipelineNodes struct { type Graph struct { componentGraph *simple.DirectedGraph pipelineNodes *pipelineNodes + consumeMetric metrics.Metric } func BuildGraph(ctx context.Context, set Settings) (*Graph, error) { @@ -176,6 +187,7 @@ func BuildGraph(ctx context.Context, set Settings) (*Graph, error) { processors: make([]*processorNode, 0), exporters: make(map[int64]*exporterNode), }, + consumeMetric: set.ConsumeMetric, } if err := g.createNodes(set); err != nil { return nil, err @@ -254,11 +266,30 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { node := nodes[i] switch n := node.(type) { case *receiverNode: - err = n.buildComponent(ctx, set.ReceiverBuilder, g.nextConsumers(n.ID())[0]) + next := g.nextConsumers(n.ID())[0] + nextElapsed := g.wrapNextWithStopwatch(&next) + wrappedNext := g.wrapAsObserveConsumer(next, receiverSeed+"/"+n.componentID.String(), nextElapsed) + err = n.buildComponent(ctx, set.ReceiverBuilder, wrappedNext) case *processorNode: - err = n.buildComponent(ctx, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) + next := g.nextConsumers(n.ID())[0] + nextElapsed := g.wrapNextWithStopwatch(&next) + err = n.buildComponent(ctx, set.ProcessorBuilder, next) + if err == nil { + n.overrideConsumer = g.wrapSelfConsumer( + n.Component.(consumer.Consumer), + processorSeed+"/"+n.componentID.String(), + nextElapsed, + ) + } case *exporterNode: err = n.buildComponent(ctx, set.ExporterBuilder) + if err == nil { + n.overrideConsumer = g.wrapSelfConsumer( + n.Component.(consumer.Consumer), + exporterSeed+"/"+n.componentID.String(), + nil, + ) + } case *fanOutNode: err = n.buildComponent(ctx, g.nextConsumers(n.ID())) } @@ -269,6 +300,37 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { return nil } +func (g *Graph) wrapNextWithStopwatch(next *consumer.BaseConsumer) *atomic.Int64 { + if g.consumeMetric == nil { + return nil + } + c, ok := (*next).(consumer.Consumer) + if !ok { + return nil + } + elapsed := &atomic.Int64{} + *next = consumer.NewStopwatchConsumer(c, elapsed) + return elapsed +} + +func (g *Graph) wrapSelfConsumer(c consumer.Consumer, name string, nextElapsed *atomic.Int64) consumer.Consumer { + if g.consumeMetric == nil { + return nil + } + return consumer.NewObserveConsumer(name, c, nextElapsed, g.consumeMetric).(consumer.Consumer) +} + +func (g *Graph) wrapAsObserveConsumer(base consumer.BaseConsumer, name string, nextElapsed *atomic.Int64) consumer.BaseConsumer { + if g.consumeMetric == nil { + return base + } + c, ok := base.(consumer.Consumer) + if !ok { + return base + } + return consumer.NewObserveConsumer(name, c, nextElapsed, g.consumeMetric) +} + func (g *Graph) nextConsumers(nodeID int64) []consumer.BaseConsumer { nextNodes := g.componentGraph.From(nodeID) nexts := make([]consumer.BaseConsumer, 0, nextNodes.Len()) diff --git a/backend/modules/observability/domain/trace/entity/collector/service/service.go b/backend/modules/observability/domain/trace/entity/collector/service/service.go index 39ec1afb0..6e64dff7e 100644 --- a/backend/modules/observability/domain/trace/entity/collector/service/service.go +++ b/backend/modules/observability/domain/trace/entity/collector/service/service.go @@ -10,6 +10,7 @@ import ( "fmt" "sync/atomic" + "github.com/coze-dev/coze-loop/backend/infra/metrics" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/component" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/exporter" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/processor" @@ -50,6 +51,7 @@ type Settings struct { ProcessorBuilder *processor.Builder ExporterBuilder *exporter.Builder PipelineConfig *Config + ConsumeMetric metrics.Metric } type Service struct { diff --git a/backend/modules/observability/domain/trace/service/ingestion.go b/backend/modules/observability/domain/trace/service/ingestion.go index 2fe61ffc6..95395d28a 100644 --- a/backend/modules/observability/domain/trace/service/ingestion.go +++ b/backend/modules/observability/domain/trace/service/ingestion.go @@ -6,6 +6,7 @@ package service import ( "context" + "github.com/coze-dev/coze-loop/backend/infra/metrics" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/exporter" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/processor" @@ -84,10 +85,12 @@ func (i *IngestionServiceImpl) RunAsync(ctx context.Context) { func NewIngestionServiceImpl( traceConfig conf.IConfigLoader, collectorFactory IngestionCollectorFactory, + consumeMetric metrics.Metric, ) (IngestionService, error) { c, err := collector.New(collector.Settings{ Factories: collectorFactory.GetCollectorFactory, ConfigProvider: collector.NewConfigProvider(traceConfig), + ConsumeMetric: consumeMetric, }) if err != nil { return nil, err diff --git a/backend/modules/observability/infra/metrics/metrics.go b/backend/modules/observability/infra/metrics/metrics.go index 1e81e0098..483dafb5e 100644 --- a/backend/modules/observability/infra/metrics/metrics.go +++ b/backend/modules/observability/infra/metrics/metrics.go @@ -51,6 +51,9 @@ func traceQueryTagNames() []string { var ( traceMetricsOnce sync.Once singletonTraceMetrics metrics2.ITraceMetrics + + consumeMetricOnce sync.Once + singletonConsumeMetric metrics.Metric ) func NewTraceMetricsImpl(meter metrics.Meter) metrics2.ITraceMetrics { @@ -135,3 +138,24 @@ func (t *TraceMetricsImpl) EmitSendMetric(start time.Time, isError bool) { metrics.Counter(1, metrics.WithSuffix(metricSendSuffix+throughputSuffix)), metrics.Timer(time.Since(start).Microseconds(), metrics.WithSuffix(metricSendSuffix+latencySuffix))) } + +const consumeMetricName = "trace_consume" + +func NewConsumeMetric(meter metrics.Meter) metrics.Metric { + consumeMetricOnce.Do(func() { + if meter == nil { + return + } + m, err := meter.NewMetric( + consumeMetricName, + []metrics.MetricType{metrics.MetricTypeCounter, metrics.MetricTypeTimer}, + []string{"node", "is_err"}, + ) + if err != nil { + logs.Error("Failed to create consume metric: %v", err) + return + } + singletonConsumeMetric = m + }) + return singletonConsumeMetric +} From b403125266031d4f3ca2bda8a05afde8294be39a Mon Sep 17 00:00:00 2001 From: cuichen Date: Fri, 3 Apr 2026 18:10:15 +0800 Subject: [PATCH 06/14] feat(observability): add psm/tenant tags to trace_consume metric - Add SpansCountByPSM to group spans by PSM for per-PSM metric emission - Emit metrics grouped by tenant/psm dimensions - Define metric tag and suffix constants in infra/metrics package - Update observe_consumer to reference centralized constants --- .../entity/collector/consumer/consumer.go | 10 ++++++ .../collector/consumer/observe_consumer.go | 24 ++++++++------ .../consumer/observe_consumer_test.go | 31 +++++++++++++++++-- .../observability/infra/metrics/metrics.go | 15 +++++++-- 4 files changed, 66 insertions(+), 14 deletions(-) diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/consumer.go b/backend/modules/observability/domain/trace/entity/collector/consumer/consumer.go index 62beaab74..f7a9a41fe 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/consumer.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/consumer.go @@ -28,6 +28,16 @@ func (t *Traces) SpansCount() int { return ret } +func (t *Traces) SpansCountByPSM() map[string]int { + result := make(map[string]int) + for _, trace := range t.TraceData { + for _, span := range trace.SpanList { + result[span.PSM]++ + } + } + return result +} + type BaseConsumer interface{} //go:generate mockgen -destination=mocks/consumer.go -package=mocks . Consumer diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go index a8fadd3d3..e8771936a 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go @@ -6,6 +6,7 @@ import ( "time" "github.com/coze-dev/coze-loop/backend/infra/metrics" + obmetrics "github.com/coze-dev/coze-loop/backend/modules/observability/infra/metrics" "github.com/coze-dev/coze-loop/backend/pkg/logs" ) @@ -44,15 +45,20 @@ func (t *ObserveConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { isErr := err != nil if t.metric != nil { logs.CtxInfo(ctx, "ObserveConsumer[%s] ConsumeTraces, self_duration=%s, is_err=%s, spans_count=%d", t.name, selfDuration, boolToStr(isErr), tds.SpansCount()) - t.metric.Emit( - []metrics.T{ - {Name: "node", Value: t.name}, - {Name: "is_err", Value: boolToStr(isErr)}, - }, - metrics.Counter(1, metrics.WithSuffix("throughput")), - metrics.Timer(selfDuration.Microseconds(), metrics.WithSuffix("latency")), - metrics.Counter(int64(tds.SpansCount()), metrics.WithSuffix("spans")), - ) + psmCounts := tds.SpansCountByPSM() + for psm, count := range psmCounts { + t.metric.Emit( + []metrics.T{ + {Name: obmetrics.ConsumeTagNode, Value: t.name}, + {Name: obmetrics.ConsumeTagIsErr, Value: boolToStr(isErr)}, + {Name: obmetrics.ConsumeTagPSM, Value: psm}, + {Name: obmetrics.ConsumeTagTenant, Value: tds.Tenant}, + }, + metrics.Counter(1, metrics.WithSuffix(obmetrics.ConsumeSuffixThroughput)), + metrics.Timer(selfDuration.Microseconds(), metrics.WithSuffix(obmetrics.ConsumeSuffixLatency)), + metrics.Counter(int64(count), metrics.WithSuffix(obmetrics.ConsumeSuffixSpans)), + ) + } } if err != nil { diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go index 9fbea419c..d7778854e 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go @@ -11,6 +11,8 @@ import ( "go.uber.org/mock/gomock" metricsmocks "github.com/coze-dev/coze-loop/backend/infra/metrics/mocks" + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity" + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span" ) func TestObserveConsumer_ConsumeTraces_Success(t *testing.T) { @@ -18,7 +20,6 @@ func TestObserveConsumer_ConsumeTraces_Success(t *testing.T) { defer ctrl.Finish() mockMetric := metricsmocks.NewMockMetric(ctrl) - mockMetric.EXPECT().Emit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) inner := &mockConsumer{} timed := NewObserveConsumer("test_node", inner, nil, mockMetric) @@ -32,7 +33,6 @@ func TestObserveConsumer_ConsumeTraces_Error(t *testing.T) { defer ctrl.Finish() mockMetric := metricsmocks.NewMockMetric(ctrl) - mockMetric.EXPECT().Emit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) expectedErr := errors.New("consume failed") inner := &errConsumer{err: expectedErr} @@ -55,7 +55,6 @@ func TestObserveConsumer_SubtractsNextElapsed(t *testing.T) { defer ctrl.Finish() mockMetric := metricsmocks.NewMockMetric(ctrl) - mockMetric.EXPECT().Emit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1) nextElapsed := &atomic.Int64{} sleepDuration := 50 * time.Millisecond @@ -72,6 +71,32 @@ func TestObserveConsumer_SubtractsNextElapsed(t *testing.T) { assert.NoError(t, err) } +func TestObserveConsumer_GroupByPSM(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMetric := metricsmocks.NewMockMetric(ctrl) + mockMetric.EXPECT().Emit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2) + + inner := &mockConsumer{} + timed := NewObserveConsumer("test_node", inner, nil, mockMetric) + + traces := Traces{ + Tenant: "test_tenant", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-a"}, + {PSM: "svc-b"}, + }, + }, + }, + } + err := timed.ConsumeTraces(context.Background(), traces) + assert.NoError(t, err) +} + func TestStopwatchConsumer_RecordsElapsed(t *testing.T) { elapsed := &atomic.Int64{} inner := &sleepConsumer{duration: 10 * time.Millisecond} diff --git a/backend/modules/observability/infra/metrics/metrics.go b/backend/modules/observability/infra/metrics/metrics.go index 483dafb5e..80b8b84c4 100644 --- a/backend/modules/observability/infra/metrics/metrics.go +++ b/backend/modules/observability/infra/metrics/metrics.go @@ -139,7 +139,18 @@ func (t *TraceMetricsImpl) EmitSendMetric(start time.Time, isError bool) { metrics.Timer(time.Since(start).Microseconds(), metrics.WithSuffix(metricSendSuffix+latencySuffix))) } -const consumeMetricName = "trace_consume" +const ( + consumeMetricName = "trace_consume" + + ConsumeTagNode = "node" + ConsumeTagIsErr = "is_err" + ConsumeTagPSM = "psm" + ConsumeTagTenant = "tenant" + + ConsumeSuffixThroughput = "throughput" + ConsumeSuffixLatency = "latency" + ConsumeSuffixSpans = "spans" +) func NewConsumeMetric(meter metrics.Meter) metrics.Metric { consumeMetricOnce.Do(func() { @@ -149,7 +160,7 @@ func NewConsumeMetric(meter metrics.Meter) metrics.Metric { m, err := meter.NewMetric( consumeMetricName, []metrics.MetricType{metrics.MetricTypeCounter, metrics.MetricTypeTimer}, - []string{"node", "is_err"}, + []string{ConsumeTagNode, ConsumeTagIsErr, ConsumeTagPSM, ConsumeTagTenant}, ) if err != nil { logs.Error("Failed to create consume metric: %v", err) From ff7b2c1b306359cdb45691cc3cf43b8875fe53a0 Mon Sep 17 00:00:00 2001 From: cuichen Date: Fri, 3 Apr 2026 18:37:17 +0800 Subject: [PATCH 07/14] feat(observability): add SpanStats ctx injection with per-pipeline FilteredCount - Add SpanStats context utilities: InjectSpanCounts, AddFilteredSpans, GetSpanStatsEntries, GetSpanStatsEntry - FilteredCount is map[string]int keyed by pipeline name, supporting independent filtering per exporter pipeline - Add InjectConsumer decorator to auto-inject span counts at Receiver entry - Wire InjectConsumer into graph building for Receiver nodes - Add comprehensive tests for SpanStats and InjectConsumer --- .../collector/consumer/inject_consumer.go | 17 +++ .../entity/collector/consumer/span_stats.go | 107 +++++++++++++ .../collector/consumer/span_stats_test.go | 144 ++++++++++++++++++ .../trace/entity/collector/service/graph.go | 9 ++ 4 files changed, 277 insertions(+) create mode 100644 backend/modules/observability/domain/trace/entity/collector/consumer/inject_consumer.go create mode 100644 backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go create mode 100644 backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/inject_consumer.go b/backend/modules/observability/domain/trace/entity/collector/consumer/inject_consumer.go new file mode 100644 index 000000000..a1b4d7866 --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/inject_consumer.go @@ -0,0 +1,17 @@ +package consumer + +import "context" + +type injectConsumer struct { + inner Consumer +} + +func NewInjectConsumer(inner Consumer) Consumer { + return &injectConsumer{inner: inner} +} + +func (c *injectConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + ctx = NewSpanStatsContext(ctx) + InjectSpanCounts(ctx, tds) + return c.inner.ConsumeTraces(ctx, tds) +} diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go new file mode 100644 index 000000000..3f3cac613 --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go @@ -0,0 +1,107 @@ +package consumer + +import "context" + +type spanStatsKey struct{} + +type SpanStatsEntry struct { + Tenant string + PSM string + InCount int + FilteredCount map[string]int +} + +func (e *SpanStatsEntry) TotalFiltered() int { + total := 0 + for _, c := range e.FilteredCount { + total += c + } + return total +} + +func (e *SpanStatsEntry) GetFiltered(node string) int { + return e.FilteredCount[node] +} + +type SpanStats struct { + entries map[string]*SpanStatsEntry +} + +func newSpanStats() *SpanStats { + return &SpanStats{ + entries: make(map[string]*SpanStatsEntry), + } +} + +func statsKey(tenant, psm string) string { + return tenant + "|" + psm +} + +func NewSpanStatsContext(ctx context.Context) context.Context { + return context.WithValue(ctx, spanStatsKey{}, newSpanStats()) +} + +func getSpanStats(ctx context.Context) *SpanStats { + v, _ := ctx.Value(spanStatsKey{}).(*SpanStats) + return v +} + +func InjectSpanCounts(ctx context.Context, tds Traces) { + stats := getSpanStats(ctx) + if stats == nil { + return + } + for _, trace := range tds.TraceData { + for _, span := range trace.SpanList { + key := statsKey(tds.Tenant, span.PSM) + entry, ok := stats.entries[key] + if !ok { + entry = &SpanStatsEntry{ + Tenant: tds.Tenant, + PSM: span.PSM, + FilteredCount: make(map[string]int), + } + stats.entries[key] = entry + } + entry.InCount++ + } + } +} + +func AddFilteredSpans(ctx context.Context, tenant, psm, pipeline string, count int) { + stats := getSpanStats(ctx) + if stats == nil { + return + } + key := statsKey(tenant, psm) + entry, ok := stats.entries[key] + if !ok { + entry = &SpanStatsEntry{ + Tenant: tenant, + PSM: psm, + FilteredCount: make(map[string]int), + } + stats.entries[key] = entry + } + entry.FilteredCount[pipeline] += count +} + +func GetSpanStatsEntries(ctx context.Context) []*SpanStatsEntry { + stats := getSpanStats(ctx) + if stats == nil { + return nil + } + result := make([]*SpanStatsEntry, 0, len(stats.entries)) + for _, entry := range stats.entries { + result = append(result, entry) + } + return result +} + +func GetSpanStatsEntry(ctx context.Context, tenant, psm string) *SpanStatsEntry { + stats := getSpanStats(ctx) + if stats == nil { + return nil + } + return stats.entries[statsKey(tenant, psm)] +} diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go new file mode 100644 index 000000000..30044e7c9 --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go @@ -0,0 +1,144 @@ +package consumer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity" + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span" +) + +func TestSpanStats_InjectAndGet(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + tds := Traces{ + Tenant: "tenant_a", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-a"}, + {PSM: "svc-b"}, + }, + }, + }, + } + InjectSpanCounts(ctx, tds) + + entryA := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entryA) + assert.Equal(t, 2, entryA.InCount) + assert.Equal(t, 0, entryA.TotalFiltered()) + + entryB := GetSpanStatsEntry(ctx, "tenant_a", "svc-b") + assert.NotNil(t, entryB) + assert.Equal(t, 1, entryB.InCount) + + entries := GetSpanStatsEntries(ctx) + assert.Len(t, entries, 2) +} + +func TestSpanStats_AddFilteredSpans_ByNode(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + tds := Traces{ + Tenant: "tenant_a", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-a"}, + {PSM: "svc-a"}, + }, + }, + }, + } + InjectSpanCounts(ctx, tds) + + AddFilteredSpans(ctx, "tenant_a", "svc-a", "exporter/ck_online", 1) + AddFilteredSpans(ctx, "tenant_a", "svc-a", "exporter/ck_offline", 2) + + entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entry) + assert.Equal(t, 3, entry.InCount) + assert.Equal(t, 1, entry.GetFiltered("exporter/ck_online")) + assert.Equal(t, 2, entry.GetFiltered("exporter/ck_offline")) + assert.Equal(t, 3, entry.TotalFiltered()) +} + +func TestSpanStats_AddFilteredSpans_SameNodeAccumulates(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + AddFilteredSpans(ctx, "tenant_a", "svc-a", "processor/filter", 3) + AddFilteredSpans(ctx, "tenant_a", "svc-a", "processor/filter", 2) + + entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entry) + assert.Equal(t, 5, entry.GetFiltered("processor/filter")) +} + +func TestSpanStats_AddFilteredSpans_NewEntry(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + AddFilteredSpans(ctx, "tenant_x", "svc-x", "node_a", 5) + + entry := GetSpanStatsEntry(ctx, "tenant_x", "svc-x") + assert.NotNil(t, entry) + assert.Equal(t, 0, entry.InCount) + assert.Equal(t, 5, entry.GetFiltered("node_a")) + assert.Equal(t, 5, entry.TotalFiltered()) +} + +func TestSpanStats_NilContext(t *testing.T) { + ctx := context.Background() + + InjectSpanCounts(ctx, Traces{}) + AddFilteredSpans(ctx, "t", "p", "n", 1) + + assert.Nil(t, GetSpanStatsEntries(ctx)) + assert.Nil(t, GetSpanStatsEntry(ctx, "t", "p")) +} + +func TestInjectConsumer_InjectsStats(t *testing.T) { + var capturedCtx context.Context + inner := &ctxCapturingConsumer{capture: &capturedCtx} + + ic := NewInjectConsumer(inner) + + tds := Traces{ + Tenant: "tenant_a", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-b"}, + }, + }, + }, + } + + err := ic.ConsumeTraces(context.Background(), tds) + assert.NoError(t, err) + + entries := GetSpanStatsEntries(capturedCtx) + assert.Len(t, entries, 2) + + entryA := GetSpanStatsEntry(capturedCtx, "tenant_a", "svc-a") + assert.NotNil(t, entryA) + assert.Equal(t, 1, entryA.InCount) + + entryB := GetSpanStatsEntry(capturedCtx, "tenant_a", "svc-b") + assert.NotNil(t, entryB) + assert.Equal(t, 1, entryB.InCount) +} + +type ctxCapturingConsumer struct { + capture *context.Context +} + +func (c *ctxCapturingConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + *c.capture = ctx + return nil +} diff --git a/backend/modules/observability/domain/trace/entity/collector/service/graph.go b/backend/modules/observability/domain/trace/entity/collector/service/graph.go index 7381a5d7f..5a5b52b40 100644 --- a/backend/modules/observability/domain/trace/entity/collector/service/graph.go +++ b/backend/modules/observability/domain/trace/entity/collector/service/graph.go @@ -269,6 +269,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { next := g.nextConsumers(n.ID())[0] nextElapsed := g.wrapNextWithStopwatch(&next) wrappedNext := g.wrapAsObserveConsumer(next, receiverSeed+"/"+n.componentID.String(), nextElapsed) + wrappedNext = g.wrapWithInjectConsumer(wrappedNext) err = n.buildComponent(ctx, set.ReceiverBuilder, wrappedNext) case *processorNode: next := g.nextConsumers(n.ID())[0] @@ -331,6 +332,14 @@ func (g *Graph) wrapAsObserveConsumer(base consumer.BaseConsumer, name string, n return consumer.NewObserveConsumer(name, c, nextElapsed, g.consumeMetric) } +func (g *Graph) wrapWithInjectConsumer(base consumer.BaseConsumer) consumer.BaseConsumer { + c, ok := base.(consumer.Consumer) + if !ok { + return base + } + return consumer.NewInjectConsumer(c) +} + func (g *Graph) nextConsumers(nodeID int64) []consumer.BaseConsumer { nextNodes := g.componentGraph.From(nodeID) nexts := make([]consumer.BaseConsumer, 0, nextNodes.Len()) From 2a4926f66c1d38e6fd765a001eb0c2753a44f522 Mon Sep 17 00:00:00 2001 From: cuichen Date: Tue, 7 Apr 2026 18:18:06 +0800 Subject: [PATCH 08/14] feat(observability): add OutCount field with per-pipeline tracking in SpanStats - Add OutCount map[string]int to SpanStatsEntry, keyed by pipeline name - Add AddOutCountSpans function mirroring AddFilteredSpans pattern - Add TotalOutCount and GetOutCount accessor methods - Add tests for OutCount by pipeline, accumulation, and new entry creation --- .../entity/collector/consumer/span_stats.go | 34 ++++++++++++ .../collector/consumer/span_stats_test.go | 54 +++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go index 3f3cac613..9537810e7 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go @@ -9,6 +9,7 @@ type SpanStatsEntry struct { PSM string InCount int FilteredCount map[string]int + OutCount map[string]int } func (e *SpanStatsEntry) TotalFiltered() int { @@ -23,6 +24,18 @@ func (e *SpanStatsEntry) GetFiltered(node string) int { return e.FilteredCount[node] } +func (e *SpanStatsEntry) TotalOutCount() int { + total := 0 + for _, c := range e.OutCount { + total += c + } + return total +} + +func (e *SpanStatsEntry) GetOutCount(pipeline string) int { + return e.OutCount[pipeline] +} + type SpanStats struct { entries map[string]*SpanStatsEntry } @@ -60,6 +73,7 @@ func InjectSpanCounts(ctx context.Context, tds Traces) { Tenant: tds.Tenant, PSM: span.PSM, FilteredCount: make(map[string]int), + OutCount: make(map[string]int), } stats.entries[key] = entry } @@ -80,12 +94,32 @@ func AddFilteredSpans(ctx context.Context, tenant, psm, pipeline string, count i Tenant: tenant, PSM: psm, FilteredCount: make(map[string]int), + OutCount: make(map[string]int), } stats.entries[key] = entry } entry.FilteredCount[pipeline] += count } +func AddOutCountSpans(ctx context.Context, tenant, psm, pipeline string, count int) { + stats := getSpanStats(ctx) + if stats == nil { + return + } + key := statsKey(tenant, psm) + entry, ok := stats.entries[key] + if !ok { + entry = &SpanStatsEntry{ + Tenant: tenant, + PSM: psm, + FilteredCount: make(map[string]int), + OutCount: make(map[string]int), + } + stats.entries[key] = entry + } + entry.OutCount[pipeline] += count +} + func GetSpanStatsEntries(ctx context.Context) []*SpanStatsEntry { stats := getSpanStats(ctx) if stats == nil { diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go index 30044e7c9..8cc73fb51 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go @@ -31,6 +31,7 @@ func TestSpanStats_InjectAndGet(t *testing.T) { assert.NotNil(t, entryA) assert.Equal(t, 2, entryA.InCount) assert.Equal(t, 0, entryA.TotalFiltered()) + assert.Equal(t, 0, entryA.TotalOutCount()) entryB := GetSpanStatsEntry(ctx, "tenant_a", "svc-b") assert.NotNil(t, entryB) @@ -96,11 +97,64 @@ func TestSpanStats_NilContext(t *testing.T) { InjectSpanCounts(ctx, Traces{}) AddFilteredSpans(ctx, "t", "p", "n", 1) + AddOutCountSpans(ctx, "t", "p", "n", 1) assert.Nil(t, GetSpanStatsEntries(ctx)) assert.Nil(t, GetSpanStatsEntry(ctx, "t", "p")) } +func TestSpanStats_AddOutCountSpans_ByPipeline(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + tds := Traces{ + Tenant: "tenant_a", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-a"}, + {PSM: "svc-a"}, + }, + }, + }, + } + InjectSpanCounts(ctx, tds) + + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter/ck_online", 2) + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter/ck_offline", 1) + + entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entry) + assert.Equal(t, 3, entry.InCount) + assert.Equal(t, 2, entry.GetOutCount("exporter/ck_online")) + assert.Equal(t, 1, entry.GetOutCount("exporter/ck_offline")) + assert.Equal(t, 3, entry.TotalOutCount()) +} + +func TestSpanStats_AddOutCountSpans_SamePipelineAccumulates(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter/ck", 3) + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter/ck", 2) + + entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entry) + assert.Equal(t, 5, entry.GetOutCount("exporter/ck")) +} + +func TestSpanStats_AddOutCountSpans_NewEntry(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + AddOutCountSpans(ctx, "tenant_x", "svc-x", "pipeline_a", 7) + + entry := GetSpanStatsEntry(ctx, "tenant_x", "svc-x") + assert.NotNil(t, entry) + assert.Equal(t, 0, entry.InCount) + assert.Equal(t, 0, entry.TotalFiltered()) + assert.Equal(t, 7, entry.GetOutCount("pipeline_a")) + assert.Equal(t, 7, entry.TotalOutCount()) +} + func TestInjectConsumer_InjectsStats(t *testing.T) { var capturedCtx context.Context inner := &ctxCapturingConsumer{capture: &capturedCtx} From c933960a036f57393add21d8dc7e8fc4a57f71f4 Mon Sep 17 00:00:00 2001 From: cuichen Date: Wed, 22 Apr 2026 11:21:52 +0800 Subject: [PATCH 09/14] update SystemTagKeys --- .../domain/trace/entity/loop_span/span.go | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/backend/modules/observability/domain/trace/entity/loop_span/span.go b/backend/modules/observability/domain/trace/entity/loop_span/span.go index 64f461962..c613adcea 100644 --- a/backend/modules/observability/domain/trace/entity/loop_span/span.go +++ b/backend/modules/observability/domain/trace/entity/loop_span/span.go @@ -997,14 +997,19 @@ func TTLFromInteger(i int64) TTL { } var SystemTagKeys = map[string]bool{ - "dc": true, - "pod_name": true, - "cluster": true, - "deploy_stage": true, - "env": true, - "language": true, - "runtime": true, - "cut_off": true, + "dc": true, + "pod_name": true, + "cluster": true, + "deploy_stage": true, + "env": true, + "language": true, + "runtime": true, + "cut_off": true, + "report_source": true, + "tool_input_tokens": true, + "tool_output_tokens": true, + "model_system_tokens": true, + "model_tool_choice_tokens": true, } func SizeofSpans(spans SpanList) int { From b6d55d89392adee000a7cea11880d658a229b4cb Mon Sep 17 00:00:00 2001 From: cuichen Date: Wed, 22 Apr 2026 16:32:43 +0800 Subject: [PATCH 10/14] update span state --- .../entity/collector/consumer/span_stats.go | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go index 1adafa8e9..b5c9d2611 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go @@ -1,6 +1,9 @@ package consumer -import "context" +import ( + "context" + "sync" +) type spanStatsKey struct{} @@ -10,9 +13,12 @@ type SpanStatsEntry struct { InCount int FilteredCount map[string]int OutCount map[string]int + spanStatsLock sync.Mutex } func (e *SpanStatsEntry) TotalFiltered() int { + e.spanStatsLock.Lock() + defer e.spanStatsLock.Unlock() total := 0 for _, c := range e.FilteredCount { total += c @@ -21,10 +27,14 @@ func (e *SpanStatsEntry) TotalFiltered() int { } func (e *SpanStatsEntry) GetFiltered(node string) int { + e.spanStatsLock.Lock() + defer e.spanStatsLock.Unlock() return e.FilteredCount[node] } func (e *SpanStatsEntry) TotalOutCount() int { + e.spanStatsLock.Lock() + defer e.spanStatsLock.Unlock() total := 0 for _, c := range e.OutCount { total += c @@ -33,11 +43,14 @@ func (e *SpanStatsEntry) TotalOutCount() int { } func (e *SpanStatsEntry) GetOutCount(pipeline string) int { + e.spanStatsLock.Lock() + defer e.spanStatsLock.Unlock() return e.OutCount[pipeline] } type SpanStats struct { entries map[string]*SpanStatsEntry + lock sync.Mutex } func newSpanStats() *SpanStats { @@ -64,6 +77,8 @@ func InjectSpanCounts(ctx context.Context, tds Traces) { if stats == nil { return } + stats.lock.Lock() + defer stats.lock.Unlock() for _, trace := range tds.TraceData { for _, span := range trace.SpanList { key := statsKey(tds.Tenant, span.PSM) @@ -77,7 +92,9 @@ func InjectSpanCounts(ctx context.Context, tds Traces) { } stats.entries[key] = entry } + entry.spanStatsLock.Lock() entry.InCount++ + entry.spanStatsLock.Unlock() } } } @@ -89,6 +106,7 @@ func AddFilteredSpans(ctx context.Context, tenant, psm, pipeline string, count i return } key := statsKey(tenant, psm) + stats.lock.Lock() entry, ok := stats.entries[key] if !ok { entry = &SpanStatsEntry{ @@ -99,7 +117,10 @@ func AddFilteredSpans(ctx context.Context, tenant, psm, pipeline string, count i } stats.entries[key] = entry } + stats.lock.Unlock() + entry.spanStatsLock.Lock() entry.FilteredCount[pipeline] += count + entry.spanStatsLock.Unlock() } func AddOutCountSpans(ctx context.Context, tenant, psm, pipeline string, count int) { @@ -108,6 +129,7 @@ func AddOutCountSpans(ctx context.Context, tenant, psm, pipeline string, count i return } key := statsKey(tenant, psm) + stats.lock.Lock() entry, ok := stats.entries[key] if !ok { entry = &SpanStatsEntry{ @@ -118,7 +140,10 @@ func AddOutCountSpans(ctx context.Context, tenant, psm, pipeline string, count i } stats.entries[key] = entry } + stats.lock.Unlock() + entry.spanStatsLock.Lock() entry.OutCount[pipeline] += count + entry.spanStatsLock.Unlock() } func GetSpanStatsEntries(ctx context.Context) []*SpanStatsEntry { @@ -126,6 +151,8 @@ func GetSpanStatsEntries(ctx context.Context) []*SpanStatsEntry { if stats == nil { return nil } + stats.lock.Lock() + defer stats.lock.Unlock() result := make([]*SpanStatsEntry, 0, len(stats.entries)) for _, entry := range stats.entries { result = append(result, entry) @@ -138,5 +165,7 @@ func GetSpanStatsEntry(ctx context.Context, tenant, psm string) *SpanStatsEntry if stats == nil { return nil } + stats.lock.Lock() + defer stats.lock.Unlock() return stats.entries[statsKey(tenant, psm)] } From 85b7fa54432e12ee08ff2522dd6554a2222ebcbf Mon Sep 17 00:00:00 2001 From: cuichen Date: Wed, 22 Apr 2026 16:50:21 +0800 Subject: [PATCH 11/14] update span state --- .../trace/entity/collector/consumer/span_stats.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go index b5c9d2611..cfaa939f9 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go @@ -42,10 +42,10 @@ func (e *SpanStatsEntry) TotalOutCount() int { return total } -func (e *SpanStatsEntry) GetOutCount(pipeline string) int { +func (e *SpanStatsEntry) GetOutCount(scene string) int { e.spanStatsLock.Lock() defer e.spanStatsLock.Unlock() - return e.OutCount[pipeline] + return e.OutCount[scene] } type SpanStats struct { @@ -99,8 +99,7 @@ func InjectSpanCounts(ctx context.Context, tds Traces) { } } -// pipeline 整体再看下名字 -func AddFilteredSpans(ctx context.Context, tenant, psm, pipeline string, count int) { +func AddFilteredSpans(ctx context.Context, tenant, psm, scene string, count int) { stats := getSpanStats(ctx) if stats == nil { return @@ -119,11 +118,11 @@ func AddFilteredSpans(ctx context.Context, tenant, psm, pipeline string, count i } stats.lock.Unlock() entry.spanStatsLock.Lock() - entry.FilteredCount[pipeline] += count + entry.FilteredCount[scene] += count entry.spanStatsLock.Unlock() } -func AddOutCountSpans(ctx context.Context, tenant, psm, pipeline string, count int) { +func AddOutCountSpans(ctx context.Context, tenant, psm, scene string, count int) { stats := getSpanStats(ctx) if stats == nil { return @@ -142,7 +141,7 @@ func AddOutCountSpans(ctx context.Context, tenant, psm, pipeline string, count i } stats.lock.Unlock() entry.spanStatsLock.Lock() - entry.OutCount[pipeline] += count + entry.OutCount[scene] += count entry.spanStatsLock.Unlock() } From 71055a1ee9b49969190891d11c6aa29b1fcc20af Mon Sep 17 00:00:00 2001 From: cuichen Date: Wed, 22 Apr 2026 17:46:19 +0800 Subject: [PATCH 12/14] update span state --- .../entity/collector/consumer/span_stats.go | 24 ++----------------- .../collector/consumer/span_stats_test.go | 11 --------- 2 files changed, 2 insertions(+), 33 deletions(-) diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go index cfaa939f9..96302a4bf 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go @@ -16,30 +16,10 @@ type SpanStatsEntry struct { spanStatsLock sync.Mutex } -func (e *SpanStatsEntry) TotalFiltered() int { +func (e *SpanStatsEntry) GetFilteredCount(scene string) int { e.spanStatsLock.Lock() defer e.spanStatsLock.Unlock() - total := 0 - for _, c := range e.FilteredCount { - total += c - } - return total -} - -func (e *SpanStatsEntry) GetFiltered(node string) int { - e.spanStatsLock.Lock() - defer e.spanStatsLock.Unlock() - return e.FilteredCount[node] -} - -func (e *SpanStatsEntry) TotalOutCount() int { - e.spanStatsLock.Lock() - defer e.spanStatsLock.Unlock() - total := 0 - for _, c := range e.OutCount { - total += c - } - return total + return e.FilteredCount[scene] } func (e *SpanStatsEntry) GetOutCount(scene string) int { diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go index 8cc73fb51..6cef1c835 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go @@ -30,8 +30,6 @@ func TestSpanStats_InjectAndGet(t *testing.T) { entryA := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") assert.NotNil(t, entryA) assert.Equal(t, 2, entryA.InCount) - assert.Equal(t, 0, entryA.TotalFiltered()) - assert.Equal(t, 0, entryA.TotalOutCount()) entryB := GetSpanStatsEntry(ctx, "tenant_a", "svc-b") assert.NotNil(t, entryB) @@ -64,9 +62,6 @@ func TestSpanStats_AddFilteredSpans_ByNode(t *testing.T) { entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") assert.NotNil(t, entry) assert.Equal(t, 3, entry.InCount) - assert.Equal(t, 1, entry.GetFiltered("exporter/ck_online")) - assert.Equal(t, 2, entry.GetFiltered("exporter/ck_offline")) - assert.Equal(t, 3, entry.TotalFiltered()) } func TestSpanStats_AddFilteredSpans_SameNodeAccumulates(t *testing.T) { @@ -77,7 +72,6 @@ func TestSpanStats_AddFilteredSpans_SameNodeAccumulates(t *testing.T) { entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") assert.NotNil(t, entry) - assert.Equal(t, 5, entry.GetFiltered("processor/filter")) } func TestSpanStats_AddFilteredSpans_NewEntry(t *testing.T) { @@ -88,8 +82,6 @@ func TestSpanStats_AddFilteredSpans_NewEntry(t *testing.T) { entry := GetSpanStatsEntry(ctx, "tenant_x", "svc-x") assert.NotNil(t, entry) assert.Equal(t, 0, entry.InCount) - assert.Equal(t, 5, entry.GetFiltered("node_a")) - assert.Equal(t, 5, entry.TotalFiltered()) } func TestSpanStats_NilContext(t *testing.T) { @@ -128,7 +120,6 @@ func TestSpanStats_AddOutCountSpans_ByPipeline(t *testing.T) { assert.Equal(t, 3, entry.InCount) assert.Equal(t, 2, entry.GetOutCount("exporter/ck_online")) assert.Equal(t, 1, entry.GetOutCount("exporter/ck_offline")) - assert.Equal(t, 3, entry.TotalOutCount()) } func TestSpanStats_AddOutCountSpans_SamePipelineAccumulates(t *testing.T) { @@ -150,9 +141,7 @@ func TestSpanStats_AddOutCountSpans_NewEntry(t *testing.T) { entry := GetSpanStatsEntry(ctx, "tenant_x", "svc-x") assert.NotNil(t, entry) assert.Equal(t, 0, entry.InCount) - assert.Equal(t, 0, entry.TotalFiltered()) assert.Equal(t, 7, entry.GetOutCount("pipeline_a")) - assert.Equal(t, 7, entry.TotalOutCount()) } func TestInjectConsumer_InjectsStats(t *testing.T) { From d185cba4dbc79fd43b8b9f99345d7d22e273c6a6 Mon Sep 17 00:00:00 2001 From: cuichen Date: Tue, 28 Apr 2026 20:52:28 +0800 Subject: [PATCH 13/14] update span state --- .../entity/collector/consumer/span_stats.go | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go index 96302a4bf..6e9858c73 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go @@ -12,7 +12,7 @@ type SpanStatsEntry struct { PSM string InCount int FilteredCount map[string]int - OutCount map[string]int + OutCount map[string]map[string]int spanStatsLock sync.Mutex } @@ -22,10 +22,13 @@ func (e *SpanStatsEntry) GetFilteredCount(scene string) int { return e.FilteredCount[scene] } -func (e *SpanStatsEntry) GetOutCount(scene string) int { +func (e *SpanStatsEntry) GetOutCount(scene, step string) int { e.spanStatsLock.Lock() defer e.spanStatsLock.Unlock() - return e.OutCount[scene] + if e.OutCount[scene] == nil { + return 0 + } + return e.OutCount[scene][step] } type SpanStats struct { @@ -68,7 +71,7 @@ func InjectSpanCounts(ctx context.Context, tds Traces) { Tenant: tds.Tenant, PSM: span.PSM, FilteredCount: make(map[string]int), - OutCount: make(map[string]int), + OutCount: make(map[string]map[string]int), } stats.entries[key] = entry } @@ -92,7 +95,7 @@ func AddFilteredSpans(ctx context.Context, tenant, psm, scene string, count int) Tenant: tenant, PSM: psm, FilteredCount: make(map[string]int), - OutCount: make(map[string]int), + OutCount: make(map[string]map[string]int), } stats.entries[key] = entry } @@ -102,7 +105,7 @@ func AddFilteredSpans(ctx context.Context, tenant, psm, scene string, count int) entry.spanStatsLock.Unlock() } -func AddOutCountSpans(ctx context.Context, tenant, psm, scene string, count int) { +func AddOutCountSpans(ctx context.Context, tenant, psm, scene, step string, count int) { stats := getSpanStats(ctx) if stats == nil { return @@ -115,13 +118,16 @@ func AddOutCountSpans(ctx context.Context, tenant, psm, scene string, count int) Tenant: tenant, PSM: psm, FilteredCount: make(map[string]int), - OutCount: make(map[string]int), + OutCount: make(map[string]map[string]int), } stats.entries[key] = entry } stats.lock.Unlock() entry.spanStatsLock.Lock() - entry.OutCount[scene] += count + if entry.OutCount[scene] == nil { + entry.OutCount[scene] = make(map[string]int) + } + entry.OutCount[scene][step] += count entry.spanStatsLock.Unlock() } From e26a80cc6133479d76d81b8f2a538795d0244f50 Mon Sep 17 00:00:00 2001 From: cuichen Date: Tue, 28 Apr 2026 20:57:02 +0800 Subject: [PATCH 14/14] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20span=5Fstats?= =?UTF-8?q?=5Ftest.go=20=E4=B8=AD=20AddOutCountSpans=20=E5=92=8C=20GetOutC?= =?UTF-8?q?ount=20=E5=8F=82=E6=95=B0=E4=B8=8D=E5=8C=B9=E9=85=8D=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 修复 AddOutCountSpans 调用,添加缺失的 step 参数 - 修复 GetOutCount 调用,添加缺失的 step 参数 - 所有测试用例已通过 --- .../collector/consumer/span_stats_test.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go index 6cef1c835..72c046e2b 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go @@ -89,7 +89,7 @@ func TestSpanStats_NilContext(t *testing.T) { InjectSpanCounts(ctx, Traces{}) AddFilteredSpans(ctx, "t", "p", "n", 1) - AddOutCountSpans(ctx, "t", "p", "n", 1) + AddOutCountSpans(ctx, "t", "p", "scene", "step", 1) assert.Nil(t, GetSpanStatsEntries(ctx)) assert.Nil(t, GetSpanStatsEntry(ctx, "t", "p")) @@ -112,36 +112,36 @@ func TestSpanStats_AddOutCountSpans_ByPipeline(t *testing.T) { } InjectSpanCounts(ctx, tds) - AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter/ck_online", 2) - AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter/ck_offline", 1) + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter", "ck_online", 2) + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter", "ck_offline", 1) entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") assert.NotNil(t, entry) assert.Equal(t, 3, entry.InCount) - assert.Equal(t, 2, entry.GetOutCount("exporter/ck_online")) - assert.Equal(t, 1, entry.GetOutCount("exporter/ck_offline")) + assert.Equal(t, 2, entry.GetOutCount("exporter", "ck_online")) + assert.Equal(t, 1, entry.GetOutCount("exporter", "ck_offline")) } func TestSpanStats_AddOutCountSpans_SamePipelineAccumulates(t *testing.T) { ctx := NewSpanStatsContext(context.Background()) - AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter/ck", 3) - AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter/ck", 2) + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter", "ck", 3) + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter", "ck", 2) entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") assert.NotNil(t, entry) - assert.Equal(t, 5, entry.GetOutCount("exporter/ck")) + assert.Equal(t, 5, entry.GetOutCount("exporter", "ck")) } func TestSpanStats_AddOutCountSpans_NewEntry(t *testing.T) { ctx := NewSpanStatsContext(context.Background()) - AddOutCountSpans(ctx, "tenant_x", "svc-x", "pipeline_a", 7) + AddOutCountSpans(ctx, "tenant_x", "svc-x", "pipeline", "a", 7) entry := GetSpanStatsEntry(ctx, "tenant_x", "svc-x") assert.NotNil(t, entry) assert.Equal(t, 0, entry.InCount) - assert.Equal(t, 7, entry.GetOutCount("pipeline_a")) + assert.Equal(t, 7, entry.GetOutCount("pipeline", "a")) } func TestInjectConsumer_InjectsStats(t *testing.T) {