diff --git a/backend/api/handler/coze/loop/apis/wire_gen.go b/backend/api/handler/coze/loop/apis/wire_gen.go index 2b20f061c..f2167340b 100644 --- a/backend/api/handler/coze/loop/apis/wire_gen.go +++ b/backend/api/handler/coze/loop/apis/wire_gen.go @@ -8,7 +8,6 @@ package apis import ( "context" - "github.com/cloudwego/kitex/pkg/endpoint" "github.com/coze-dev/coze-loop/backend/infra/ck" "github.com/coze-dev/coze-loop/backend/infra/db" @@ -174,7 +173,7 @@ func InitObservabilityHandler(ctx context.Context, db2 db.Provider, ckDb ck.Prov if err != nil { return nil, err } - iTraceIngestionApplication, err := application6.InitTraceIngestionApplication(configFactory, storageProvider, ckDb, db2, mqFactory, persistentCmdable, idgen2) + iTraceIngestionApplication, err := application6.InitTraceIngestionApplication(configFactory, storageProvider, ckDb, db2, mqFactory, persistentCmdable, idgen2, meter) if err != nil { return nil, err } diff --git a/backend/modules/observability/application/wire.go b/backend/modules/observability/application/wire.go index fb82c90e3..f94e06d41 100644 --- a/backend/modules/observability/application/wire.go +++ b/backend/modules/observability/application/wire.go @@ -139,6 +139,7 @@ var ( mq2.NewSpanWithAnnotationProducerImpl, redis2.NewSpansRedisDaoImpl, mysqldao.NewTrajectoryConfigDaoImpl, + obmetrics.NewConsumeMetric, ) openApiSet = wire.NewSet( NewOpenAPIApplication, @@ -472,6 +473,7 @@ func InitTraceIngestionApplication( mqFactory mq.IFactory, persistentCmdable redis.PersistentCmdable, idGenerator idgen.IIDGenerator, + meter metrics.Meter, ) (ITraceIngestionApplication, error) { wire.Build(traceIngestionSet) return nil, nil diff --git a/backend/modules/observability/application/wire_gen.go b/backend/modules/observability/application/wire_gen.go index ffe5e2201..4f541a61c 100644 --- a/backend/modules/observability/application/wire_gen.go +++ b/backend/modules/observability/application/wire_gen.go @@ -234,7 +234,7 @@ func InitMetricApplication(ckDb ck.Provider, storageProvider storage2.IStoragePr return iMetricApplication, nil } -func InitTraceIngestionApplication(configFactory conf.IConfigLoaderFactory, storageProvider storage2.IStorageProvider, ckDb ck.Provider, db2 db.Provider, mqFactory mq.IFactory, persistentCmdable redis.PersistentCmdable, idGenerator idgen.IIDGenerator) (ITraceIngestionApplication, error) { +func InitTraceIngestionApplication(configFactory conf.IConfigLoaderFactory, storageProvider storage2.IStorageProvider, ckDb ck.Provider, db2 db.Provider, mqFactory mq.IFactory, persistentCmdable redis.PersistentCmdable, idGenerator idgen.IIDGenerator, meter metrics.Meter) (ITraceIngestionApplication, error) { iConfigLoader, err := NewTraceConfigLoader(configFactory) if err != nil { return nil, err @@ -254,7 +254,8 @@ func InitTraceIngestionApplication(configFactory conf.IConfigLoaderFactory, stor return nil, err } ingestionCollectorFactory := NewIngestionCollectorFactory(mqFactory, iTraceRepo) - ingestionService, err := service.NewIngestionServiceImpl(iConfigLoader, ingestionCollectorFactory) + metric := metrics2.NewConsumeMetric(meter) + ingestionService, err := service.NewIngestionServiceImpl(iConfigLoader, ingestionCollectorFactory, metric) if err != nil { return nil, err } @@ -348,7 +349,7 @@ var ( ) traceIngestionSet = wire.NewSet( NewIngestionApplication, service.NewIngestionServiceImpl, provideTraceRepo, config.NewTraceConfigCenter, NewTraceConfigLoader, - NewIngestionCollectorFactory, producer.NewSpanWithAnnotationProducerImpl, redis2.NewSpansRedisDaoImpl, mysql.NewTrajectoryConfigDaoImpl, + NewIngestionCollectorFactory, producer.NewSpanWithAnnotationProducerImpl, redis2.NewSpansRedisDaoImpl, mysql.NewTrajectoryConfigDaoImpl, metrics2.NewConsumeMetric, ) openApiSet = wire.NewSet( NewOpenAPIApplication, auth.NewAuthProvider, traceDomainSet, time_range.NewTimeRangeProvider, diff --git a/backend/modules/observability/domain/trace/entity/collector/collector.go b/backend/modules/observability/domain/trace/entity/collector/collector.go index 177e74516..9051d2387 100644 --- a/backend/modules/observability/domain/trace/entity/collector/collector.go +++ b/backend/modules/observability/domain/trace/entity/collector/collector.go @@ -13,6 +13,7 @@ import ( "os/signal" "syscall" + "github.com/coze-dev/coze-loop/backend/infra/metrics" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/component" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/exporter" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/processor" @@ -109,6 +110,7 @@ func (cfg *Config) Validate() error { type Settings struct { Factories func() (Factories, error) ConfigProvider ConfigProvider + ConsumeMetric metrics.Metric } type Collector struct { @@ -135,8 +137,8 @@ func (col *Collector) WaitForReady() { } // 通常在异步线程中进行, 主线程需要等待初始化完成 -func (col *Collector) Run(ctx context.Context) error { - if err := col.setupConfigurationComponents(ctx); err != nil { +func (col *Collector) Run(ctx context.Context, hook func() error) error { + if err := col.setupConfigurationComponentsWithHook(ctx, hook); err != nil { return err } signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) @@ -151,8 +153,8 @@ func (col *Collector) Run(ctx context.Context) error { } // 同步阻塞执行 -func (col *Collector) RunInOne(ctx context.Context) error { - if err := col.setupConfigurationComponents(ctx); err != nil { +func (col *Collector) RunInOne(ctx context.Context, hook func() error) error { + if err := col.setupConfigurationComponentsWithHook(ctx, hook); err != nil { return err } signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) @@ -165,7 +167,7 @@ func (col *Collector) RunInOne(ctx context.Context) error { return col.shutdown(ctx) } -func (col *Collector) setupConfigurationComponents(ctx context.Context) error { +func (col *Collector) setupConfigurationComponentsWithHook(ctx context.Context, hook func() error) error { factories, err := col.set.Factories() if err != nil { return err @@ -183,6 +185,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { ProcessorBuilder: processor.NewBuilder(cfg.Processors, factories.Processors), ExporterBuilder: exporter.NewBuilder(cfg.Exporters, factories.Exporters), PipelineConfig: tenantCfg, + ConsumeMetric: col.set.ConsumeMetric, }) if err != nil { return err @@ -195,6 +198,10 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return fmt.Errorf("failed to start tenant %q, %v", tenantName, err) } } + + if err = hook(); err != nil { + fmt.Printf("hook failed, %v\n", err) + } return nil } diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/consumer.go b/backend/modules/observability/domain/trace/entity/collector/consumer/consumer.go index 62beaab74..f7a9a41fe 100644 --- a/backend/modules/observability/domain/trace/entity/collector/consumer/consumer.go +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/consumer.go @@ -28,6 +28,16 @@ func (t *Traces) SpansCount() int { return ret } +func (t *Traces) SpansCountByPSM() map[string]int { + result := make(map[string]int) + for _, trace := range t.TraceData { + for _, span := range trace.SpanList { + result[span.PSM]++ + } + } + return result +} + type BaseConsumer interface{} //go:generate mockgen -destination=mocks/consumer.go -package=mocks . Consumer diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/inject_consumer.go b/backend/modules/observability/domain/trace/entity/collector/consumer/inject_consumer.go new file mode 100644 index 000000000..a1b4d7866 --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/inject_consumer.go @@ -0,0 +1,17 @@ +package consumer + +import "context" + +type injectConsumer struct { + inner Consumer +} + +func NewInjectConsumer(inner Consumer) Consumer { + return &injectConsumer{inner: inner} +} + +func (c *injectConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + ctx = NewSpanStatsContext(ctx) + InjectSpanCounts(ctx, tds) + return c.inner.ConsumeTraces(ctx, tds) +} diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go new file mode 100644 index 000000000..e8771936a --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer.go @@ -0,0 +1,94 @@ +package consumer + +import ( + "context" + "sync/atomic" + "time" + + "github.com/coze-dev/coze-loop/backend/infra/metrics" + obmetrics "github.com/coze-dev/coze-loop/backend/modules/observability/infra/metrics" + "github.com/coze-dev/coze-loop/backend/pkg/logs" +) + +type ObserveConsumer struct { + name string + inner Consumer + nextElapsed *atomic.Int64 + metric metrics.Metric +} + +func NewObserveConsumer(name string, inner Consumer, nextElapsed *atomic.Int64, metric metrics.Metric) Consumer { + return &ObserveConsumer{ + name: name, + inner: inner, + nextElapsed: nextElapsed, + metric: metric, + } +} + +func (t *ObserveConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + if t.nextElapsed != nil { + t.nextElapsed.Store(0) + } + + start := time.Now() + err := t.inner.ConsumeTraces(ctx, tds) + total := time.Since(start) + + var selfDuration time.Duration + if t.nextElapsed != nil { + selfDuration = total - time.Duration(t.nextElapsed.Load()) + } else { + selfDuration = total + } + + isErr := err != nil + if t.metric != nil { + logs.CtxInfo(ctx, "ObserveConsumer[%s] ConsumeTraces, self_duration=%s, is_err=%s, spans_count=%d", t.name, selfDuration, boolToStr(isErr), tds.SpansCount()) + psmCounts := tds.SpansCountByPSM() + for psm, count := range psmCounts { + t.metric.Emit( + []metrics.T{ + {Name: obmetrics.ConsumeTagNode, Value: t.name}, + {Name: obmetrics.ConsumeTagIsErr, Value: boolToStr(isErr)}, + {Name: obmetrics.ConsumeTagPSM, Value: psm}, + {Name: obmetrics.ConsumeTagTenant, Value: tds.Tenant}, + }, + metrics.Counter(1, metrics.WithSuffix(obmetrics.ConsumeSuffixThroughput)), + metrics.Timer(selfDuration.Microseconds(), metrics.WithSuffix(obmetrics.ConsumeSuffixLatency)), + metrics.Counter(int64(count), metrics.WithSuffix(obmetrics.ConsumeSuffixSpans)), + ) + } + } + + if err != nil { + logs.CtxWarn(ctx, "ObserveConsumer[%s] ConsumeTraces failed, self_duration=%s, err=%v", t.name, selfDuration, err) + } + return err +} + +type stopwatchConsumer struct { + inner Consumer + elapsed *atomic.Int64 +} + +func NewStopwatchConsumer(inner Consumer, elapsed *atomic.Int64) Consumer { + return &stopwatchConsumer{ + inner: inner, + elapsed: elapsed, + } +} + +func (s *stopwatchConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + start := time.Now() + err := s.inner.ConsumeTraces(ctx, tds) + s.elapsed.Add(time.Since(start).Nanoseconds()) + return err +} + +func boolToStr(b bool) string { + if b { + return "true" + } + return "false" +} diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go new file mode 100644 index 000000000..d7778854e --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/observe_consumer_test.go @@ -0,0 +1,141 @@ +package consumer + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + metricsmocks "github.com/coze-dev/coze-loop/backend/infra/metrics/mocks" + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity" + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span" +) + +func TestObserveConsumer_ConsumeTraces_Success(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMetric := metricsmocks.NewMockMetric(ctrl) + + inner := &mockConsumer{} + timed := NewObserveConsumer("test_node", inner, nil, mockMetric) + + err := timed.ConsumeTraces(context.Background(), Traces{}) + assert.NoError(t, err) +} + +func TestObserveConsumer_ConsumeTraces_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMetric := metricsmocks.NewMockMetric(ctrl) + + expectedErr := errors.New("consume failed") + inner := &errConsumer{err: expectedErr} + timed := NewObserveConsumer("test_node", inner, nil, mockMetric) + + err := timed.ConsumeTraces(context.Background(), Traces{}) + assert.ErrorIs(t, err, expectedErr) +} + +func TestObserveConsumer_ConsumeTraces_NilMetric(t *testing.T) { + inner := &mockConsumer{} + timed := NewObserveConsumer("test_node", inner, nil, nil) + + err := timed.ConsumeTraces(context.Background(), Traces{}) + assert.NoError(t, err) +} + +func TestObserveConsumer_SubtractsNextElapsed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMetric := metricsmocks.NewMockMetric(ctrl) + + nextElapsed := &atomic.Int64{} + sleepDuration := 50 * time.Millisecond + + inner := &sleepConsumer{ + duration: sleepDuration, + afterSleep: func() { + nextElapsed.Store((100 * time.Millisecond).Nanoseconds()) + }, + } + timed := NewObserveConsumer("test_node", inner, nextElapsed, mockMetric) + + err := timed.ConsumeTraces(context.Background(), Traces{}) + assert.NoError(t, err) +} + +func TestObserveConsumer_GroupByPSM(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockMetric := metricsmocks.NewMockMetric(ctrl) + mockMetric.EXPECT().Emit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2) + + inner := &mockConsumer{} + timed := NewObserveConsumer("test_node", inner, nil, mockMetric) + + traces := Traces{ + Tenant: "test_tenant", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-a"}, + {PSM: "svc-b"}, + }, + }, + }, + } + err := timed.ConsumeTraces(context.Background(), traces) + assert.NoError(t, err) +} + +func TestStopwatchConsumer_RecordsElapsed(t *testing.T) { + elapsed := &atomic.Int64{} + inner := &sleepConsumer{duration: 10 * time.Millisecond} + sw := NewStopwatchConsumer(inner, elapsed) + + err := sw.ConsumeTraces(context.Background(), Traces{}) + assert.NoError(t, err) + assert.Greater(t, elapsed.Load(), int64(0)) +} + +func TestStopwatchConsumer_AccumulatesElapsed(t *testing.T) { + elapsed := &atomic.Int64{} + inner := &sleepConsumer{duration: 5 * time.Millisecond} + sw := NewStopwatchConsumer(inner, elapsed) + + _ = sw.ConsumeTraces(context.Background(), Traces{}) + first := elapsed.Load() + _ = sw.ConsumeTraces(context.Background(), Traces{}) + second := elapsed.Load() + assert.Greater(t, second, first) +} + +type errConsumer struct { + err error +} + +func (e *errConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + return e.err +} + +type sleepConsumer struct { + duration time.Duration + afterSleep func() +} + +func (s *sleepConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + time.Sleep(s.duration) + if s.afterSleep != nil { + s.afterSleep() + } + return nil +} diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go new file mode 100644 index 000000000..6e9858c73 --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats.go @@ -0,0 +1,156 @@ +package consumer + +import ( + "context" + "sync" +) + +type spanStatsKey struct{} + +type SpanStatsEntry struct { + Tenant string + PSM string + InCount int + FilteredCount map[string]int + OutCount map[string]map[string]int + spanStatsLock sync.Mutex +} + +func (e *SpanStatsEntry) GetFilteredCount(scene string) int { + e.spanStatsLock.Lock() + defer e.spanStatsLock.Unlock() + return e.FilteredCount[scene] +} + +func (e *SpanStatsEntry) GetOutCount(scene, step string) int { + e.spanStatsLock.Lock() + defer e.spanStatsLock.Unlock() + if e.OutCount[scene] == nil { + return 0 + } + return e.OutCount[scene][step] +} + +type SpanStats struct { + entries map[string]*SpanStatsEntry + lock sync.Mutex +} + +func newSpanStats() *SpanStats { + return &SpanStats{ + entries: make(map[string]*SpanStatsEntry), + } +} + +func statsKey(tenant, psm string) string { + return tenant + "|" + psm +} + +func NewSpanStatsContext(ctx context.Context) context.Context { + return context.WithValue(ctx, spanStatsKey{}, newSpanStats()) +} + +func getSpanStats(ctx context.Context) *SpanStats { + v, _ := ctx.Value(spanStatsKey{}).(*SpanStats) + return v +} + +func InjectSpanCounts(ctx context.Context, tds Traces) { + stats := getSpanStats(ctx) + if stats == nil { + return + } + stats.lock.Lock() + defer stats.lock.Unlock() + for _, trace := range tds.TraceData { + for _, span := range trace.SpanList { + key := statsKey(tds.Tenant, span.PSM) + entry, ok := stats.entries[key] + if !ok { + entry = &SpanStatsEntry{ + Tenant: tds.Tenant, + PSM: span.PSM, + FilteredCount: make(map[string]int), + OutCount: make(map[string]map[string]int), + } + stats.entries[key] = entry + } + entry.spanStatsLock.Lock() + entry.InCount++ + entry.spanStatsLock.Unlock() + } + } +} + +func AddFilteredSpans(ctx context.Context, tenant, psm, scene string, count int) { + stats := getSpanStats(ctx) + if stats == nil { + return + } + key := statsKey(tenant, psm) + stats.lock.Lock() + entry, ok := stats.entries[key] + if !ok { + entry = &SpanStatsEntry{ + Tenant: tenant, + PSM: psm, + FilteredCount: make(map[string]int), + OutCount: make(map[string]map[string]int), + } + stats.entries[key] = entry + } + stats.lock.Unlock() + entry.spanStatsLock.Lock() + entry.FilteredCount[scene] += count + entry.spanStatsLock.Unlock() +} + +func AddOutCountSpans(ctx context.Context, tenant, psm, scene, step string, count int) { + stats := getSpanStats(ctx) + if stats == nil { + return + } + key := statsKey(tenant, psm) + stats.lock.Lock() + entry, ok := stats.entries[key] + if !ok { + entry = &SpanStatsEntry{ + Tenant: tenant, + PSM: psm, + FilteredCount: make(map[string]int), + OutCount: make(map[string]map[string]int), + } + stats.entries[key] = entry + } + stats.lock.Unlock() + entry.spanStatsLock.Lock() + if entry.OutCount[scene] == nil { + entry.OutCount[scene] = make(map[string]int) + } + entry.OutCount[scene][step] += count + entry.spanStatsLock.Unlock() +} + +func GetSpanStatsEntries(ctx context.Context) []*SpanStatsEntry { + stats := getSpanStats(ctx) + if stats == nil { + return nil + } + stats.lock.Lock() + defer stats.lock.Unlock() + result := make([]*SpanStatsEntry, 0, len(stats.entries)) + for _, entry := range stats.entries { + result = append(result, entry) + } + return result +} + +func GetSpanStatsEntry(ctx context.Context, tenant, psm string) *SpanStatsEntry { + stats := getSpanStats(ctx) + if stats == nil { + return nil + } + stats.lock.Lock() + defer stats.lock.Unlock() + return stats.entries[statsKey(tenant, psm)] +} diff --git a/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go new file mode 100644 index 000000000..72c046e2b --- /dev/null +++ b/backend/modules/observability/domain/trace/entity/collector/consumer/span_stats_test.go @@ -0,0 +1,187 @@ +package consumer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity" + "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span" +) + +func TestSpanStats_InjectAndGet(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + tds := Traces{ + Tenant: "tenant_a", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-a"}, + {PSM: "svc-b"}, + }, + }, + }, + } + InjectSpanCounts(ctx, tds) + + entryA := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entryA) + assert.Equal(t, 2, entryA.InCount) + + entryB := GetSpanStatsEntry(ctx, "tenant_a", "svc-b") + assert.NotNil(t, entryB) + assert.Equal(t, 1, entryB.InCount) + + entries := GetSpanStatsEntries(ctx) + assert.Len(t, entries, 2) +} + +func TestSpanStats_AddFilteredSpans_ByNode(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + tds := Traces{ + Tenant: "tenant_a", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-a"}, + {PSM: "svc-a"}, + }, + }, + }, + } + InjectSpanCounts(ctx, tds) + + AddFilteredSpans(ctx, "tenant_a", "svc-a", "exporter/ck_online", 1) + AddFilteredSpans(ctx, "tenant_a", "svc-a", "exporter/ck_offline", 2) + + entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entry) + assert.Equal(t, 3, entry.InCount) +} + +func TestSpanStats_AddFilteredSpans_SameNodeAccumulates(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + AddFilteredSpans(ctx, "tenant_a", "svc-a", "processor/filter", 3) + AddFilteredSpans(ctx, "tenant_a", "svc-a", "processor/filter", 2) + + entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entry) +} + +func TestSpanStats_AddFilteredSpans_NewEntry(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + AddFilteredSpans(ctx, "tenant_x", "svc-x", "node_a", 5) + + entry := GetSpanStatsEntry(ctx, "tenant_x", "svc-x") + assert.NotNil(t, entry) + assert.Equal(t, 0, entry.InCount) +} + +func TestSpanStats_NilContext(t *testing.T) { + ctx := context.Background() + + InjectSpanCounts(ctx, Traces{}) + AddFilteredSpans(ctx, "t", "p", "n", 1) + AddOutCountSpans(ctx, "t", "p", "scene", "step", 1) + + assert.Nil(t, GetSpanStatsEntries(ctx)) + assert.Nil(t, GetSpanStatsEntry(ctx, "t", "p")) +} + +func TestSpanStats_AddOutCountSpans_ByPipeline(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + tds := Traces{ + Tenant: "tenant_a", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-a"}, + {PSM: "svc-a"}, + }, + }, + }, + } + InjectSpanCounts(ctx, tds) + + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter", "ck_online", 2) + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter", "ck_offline", 1) + + entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entry) + assert.Equal(t, 3, entry.InCount) + assert.Equal(t, 2, entry.GetOutCount("exporter", "ck_online")) + assert.Equal(t, 1, entry.GetOutCount("exporter", "ck_offline")) +} + +func TestSpanStats_AddOutCountSpans_SamePipelineAccumulates(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter", "ck", 3) + AddOutCountSpans(ctx, "tenant_a", "svc-a", "exporter", "ck", 2) + + entry := GetSpanStatsEntry(ctx, "tenant_a", "svc-a") + assert.NotNil(t, entry) + assert.Equal(t, 5, entry.GetOutCount("exporter", "ck")) +} + +func TestSpanStats_AddOutCountSpans_NewEntry(t *testing.T) { + ctx := NewSpanStatsContext(context.Background()) + + AddOutCountSpans(ctx, "tenant_x", "svc-x", "pipeline", "a", 7) + + entry := GetSpanStatsEntry(ctx, "tenant_x", "svc-x") + assert.NotNil(t, entry) + assert.Equal(t, 0, entry.InCount) + assert.Equal(t, 7, entry.GetOutCount("pipeline", "a")) +} + +func TestInjectConsumer_InjectsStats(t *testing.T) { + var capturedCtx context.Context + inner := &ctxCapturingConsumer{capture: &capturedCtx} + + ic := NewInjectConsumer(inner) + + tds := Traces{ + Tenant: "tenant_a", + TraceData: []*entity.TraceData{ + { + SpanList: loop_span.SpanList{ + {PSM: "svc-a"}, + {PSM: "svc-b"}, + }, + }, + }, + } + + err := ic.ConsumeTraces(context.Background(), tds) + assert.NoError(t, err) + + entries := GetSpanStatsEntries(capturedCtx) + assert.Len(t, entries, 2) + + entryA := GetSpanStatsEntry(capturedCtx, "tenant_a", "svc-a") + assert.NotNil(t, entryA) + assert.Equal(t, 1, entryA.InCount) + + entryB := GetSpanStatsEntry(capturedCtx, "tenant_a", "svc-b") + assert.NotNil(t, entryB) + assert.Equal(t, 1, entryB.InCount) +} + +type ctxCapturingConsumer struct { + capture *context.Context +} + +func (c *ctxCapturingConsumer) ConsumeTraces(ctx context.Context, tds Traces) error { + *c.capture = ctx + return nil +} diff --git a/backend/modules/observability/domain/trace/entity/collector/service/graph.go b/backend/modules/observability/domain/trace/entity/collector/service/graph.go index 24a1d3259..5a5b52b40 100644 --- a/backend/modules/observability/domain/trace/entity/collector/service/graph.go +++ b/backend/modules/observability/domain/trace/entity/collector/service/graph.go @@ -11,10 +11,12 @@ import ( "fmt" "hash/fnv" "strings" + "sync/atomic" "gonum.org/v1/gonum/graph/simple" "gonum.org/v1/gonum/graph/topo" + "github.com/coze-dev/coze-loop/backend/infra/metrics" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/component" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/consumer" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/exporter" @@ -76,8 +78,9 @@ func (n *receiverNode) getConsumer() consumer.BaseConsumer { type processorNode struct { nodeID - componentID component.ID + componentID component.ID component.Component + overrideConsumer consumer.Consumer } func newProcessorNode(procID component.ID) *processorNode { @@ -100,13 +103,17 @@ func (n *processorNode) buildComponent(ctx context.Context, builder *processor.B } func (n *processorNode) getConsumer() consumer.BaseConsumer { + if n.overrideConsumer != nil { + return n.overrideConsumer + } return n.Component.(consumer.BaseConsumer) } type exporterNode struct { nodeID - componentID component.ID + componentID component.ID component.Component + overrideConsumer consumer.Consumer } func newExporterNode(exprID component.ID) *exporterNode { @@ -129,6 +136,9 @@ func (n *exporterNode) buildComponent(ctx context.Context, builder *exporter.Bui } func (n *exporterNode) getConsumer() consumer.BaseConsumer { + if n.overrideConsumer != nil { + return n.overrideConsumer + } return n.Component.(consumer.BaseConsumer) } @@ -166,6 +176,7 @@ type pipelineNodes struct { type Graph struct { componentGraph *simple.DirectedGraph pipelineNodes *pipelineNodes + consumeMetric metrics.Metric } func BuildGraph(ctx context.Context, set Settings) (*Graph, error) { @@ -176,6 +187,7 @@ func BuildGraph(ctx context.Context, set Settings) (*Graph, error) { processors: make([]*processorNode, 0), exporters: make(map[int64]*exporterNode), }, + consumeMetric: set.ConsumeMetric, } if err := g.createNodes(set); err != nil { return nil, err @@ -254,11 +266,31 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { node := nodes[i] switch n := node.(type) { case *receiverNode: - err = n.buildComponent(ctx, set.ReceiverBuilder, g.nextConsumers(n.ID())[0]) + next := g.nextConsumers(n.ID())[0] + nextElapsed := g.wrapNextWithStopwatch(&next) + wrappedNext := g.wrapAsObserveConsumer(next, receiverSeed+"/"+n.componentID.String(), nextElapsed) + wrappedNext = g.wrapWithInjectConsumer(wrappedNext) + err = n.buildComponent(ctx, set.ReceiverBuilder, wrappedNext) case *processorNode: - err = n.buildComponent(ctx, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) + next := g.nextConsumers(n.ID())[0] + nextElapsed := g.wrapNextWithStopwatch(&next) + err = n.buildComponent(ctx, set.ProcessorBuilder, next) + if err == nil { + n.overrideConsumer = g.wrapSelfConsumer( + n.Component.(consumer.Consumer), + processorSeed+"/"+n.componentID.String(), + nextElapsed, + ) + } case *exporterNode: err = n.buildComponent(ctx, set.ExporterBuilder) + if err == nil { + n.overrideConsumer = g.wrapSelfConsumer( + n.Component.(consumer.Consumer), + exporterSeed+"/"+n.componentID.String(), + nil, + ) + } case *fanOutNode: err = n.buildComponent(ctx, g.nextConsumers(n.ID())) } @@ -269,6 +301,45 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { return nil } +func (g *Graph) wrapNextWithStopwatch(next *consumer.BaseConsumer) *atomic.Int64 { + if g.consumeMetric == nil { + return nil + } + c, ok := (*next).(consumer.Consumer) + if !ok { + return nil + } + elapsed := &atomic.Int64{} + *next = consumer.NewStopwatchConsumer(c, elapsed) + return elapsed +} + +func (g *Graph) wrapSelfConsumer(c consumer.Consumer, name string, nextElapsed *atomic.Int64) consumer.Consumer { + if g.consumeMetric == nil { + return nil + } + return consumer.NewObserveConsumer(name, c, nextElapsed, g.consumeMetric).(consumer.Consumer) +} + +func (g *Graph) wrapAsObserveConsumer(base consumer.BaseConsumer, name string, nextElapsed *atomic.Int64) consumer.BaseConsumer { + if g.consumeMetric == nil { + return base + } + c, ok := base.(consumer.Consumer) + if !ok { + return base + } + return consumer.NewObserveConsumer(name, c, nextElapsed, g.consumeMetric) +} + +func (g *Graph) wrapWithInjectConsumer(base consumer.BaseConsumer) consumer.BaseConsumer { + c, ok := base.(consumer.Consumer) + if !ok { + return base + } + return consumer.NewInjectConsumer(c) +} + func (g *Graph) nextConsumers(nodeID int64) []consumer.BaseConsumer { nextNodes := g.componentGraph.From(nodeID) nexts := make([]consumer.BaseConsumer, 0, nextNodes.Len()) diff --git a/backend/modules/observability/domain/trace/entity/collector/service/service.go b/backend/modules/observability/domain/trace/entity/collector/service/service.go index 39ec1afb0..6e64dff7e 100644 --- a/backend/modules/observability/domain/trace/entity/collector/service/service.go +++ b/backend/modules/observability/domain/trace/entity/collector/service/service.go @@ -10,6 +10,7 @@ import ( "fmt" "sync/atomic" + "github.com/coze-dev/coze-loop/backend/infra/metrics" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/component" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/exporter" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/processor" @@ -50,6 +51,7 @@ type Settings struct { ProcessorBuilder *processor.Builder ExporterBuilder *exporter.Builder PipelineConfig *Config + ConsumeMetric metrics.Metric } type Service struct { diff --git a/backend/modules/observability/domain/trace/entity/loop_span/span.go b/backend/modules/observability/domain/trace/entity/loop_span/span.go index 4b564cfce..0a18cac66 100644 --- a/backend/modules/observability/domain/trace/entity/loop_span/span.go +++ b/backend/modules/observability/domain/trace/entity/loop_span/span.go @@ -100,6 +100,25 @@ const ( TTL365d TTL = "365d" ) +func GetDurationByTTL(ttl TTL) time.Duration { + switch ttl { + case TTL3d: + return time.Hour * 24 * 3 + case TTL7d: + return time.Hour * 24 * 7 + case TTL30d: + return time.Hour * 24 * 30 + case TTL90d: + return time.Hour * 24 * 90 + case TTL180d: + return time.Hour * 24 * 180 + case TTL365d: + return time.Hour * 24 * 365 + default: + return time.Hour * 24 * 3 + } +} + var TimeTagSlice = []string{ SpanFieldStartTimeFirstResp, SpanFieldLatencyFirstResp, @@ -174,6 +193,9 @@ type Span struct { Output string `json:"output"` ObjectStorage string `json:"object_storage"` + InputObject map[string]interface{} `json:"input_object"` + OutputObject map[string]interface{} `json:"output_object"` + SystemTagsString map[string]string `json:"system_tags_string"` SystemTagsLong map[string]int64 `json:"system_tags_long"` SystemTagsDouble map[string]float64 `json:"system_tags_double"` @@ -983,14 +1005,19 @@ func TTLFromInteger(i int64) TTL { } var SystemTagKeys = map[string]bool{ - "dc": true, - "pod_name": true, - "cluster": true, - "deploy_stage": true, - "env": true, - "language": true, - "runtime": true, - "cut_off": true, + "dc": true, + "pod_name": true, + "cluster": true, + "deploy_stage": true, + "env": true, + "language": true, + "runtime": true, + "cut_off": true, + "report_source": true, + "tool_input_tokens": true, + "tool_output_tokens": true, + "model_system_tokens": true, + "model_tool_choice_tokens": true, } func SizeofSpans(spans SpanList) int { @@ -1104,3 +1131,153 @@ func SizeofSpans(spans SpanList) int { func SizeOfString(s string) int { return len(s) } + +func CopySpans(spans []*Span) []*Span { + result := make([]*Span, 0, len(spans)) + for _, s := range spans { + result = append(result, CopySpan(s)) + } + return result +} + +func CopySpan(s *Span) *Span { + if s == nil { + return nil + } + return &Span{ + StartTime: s.StartTime, + SpanID: s.SpanID, + ParentID: s.ParentID, + TraceID: s.TraceID, + DurationMicros: s.DurationMicros, + CallType: s.CallType, + PSM: s.PSM, + LogID: s.LogID, + WorkspaceID: s.WorkspaceID, + SpanName: s.SpanName, + SpanType: s.SpanType, + Method: s.Method, + StatusCode: s.StatusCode, + Input: s.Input, + Output: s.Output, + ObjectStorage: s.ObjectStorage, + + SystemTagsString: copyMapStringString(s.SystemTagsString), + SystemTagsLong: copyMapStringInt64(s.SystemTagsLong), + SystemTagsDouble: copyMapStringFloat64(s.SystemTagsDouble), + + TagsString: copyMapStringString(s.TagsString), + TagsLong: copyMapStringInt64(s.TagsLong), + TagsDouble: copyMapStringFloat64(s.TagsDouble), + + TagsBool: copyMapStringBool(s.TagsBool), + TagsByte: copyMapStringString(s.TagsByte), + + AttrTos: copyAttrTos(s.AttrTos), + LogicDeleteTime: s.LogicDeleteTime, + Annotations: copyAnnotationList(s.Annotations), + Encryption: s.Encryption, + } +} + +func copyAttrTos(src *AttrTos) *AttrTos { + if src == nil { + return nil + } + return &AttrTos{ + InputDataURL: src.InputDataURL, + OutputDataURL: src.OutputDataURL, + MultimodalData: copyMapStringString(src.MultimodalData), + } +} + +func copyAnnotationList(src AnnotationList) AnnotationList { + if src == nil { + return nil + } + dst := make(AnnotationList, len(src)) + for i, a := range src { + dst[i] = copyAnnotation(a) + } + return dst +} + +func copyAnnotation(src *Annotation) *Annotation { + if src == nil { + return nil + } + var annotationIndex []string + if src.AnnotationIndex != nil { + annotationIndex = make([]string, len(src.AnnotationIndex)) + copy(annotationIndex, src.AnnotationIndex) + } + var corrections []AnnotationCorrection + if src.Corrections != nil { + corrections = make([]AnnotationCorrection, len(src.Corrections)) + copy(corrections, src.Corrections) + } + return &Annotation{ + ID: src.ID, + SpanID: src.SpanID, + TraceID: src.TraceID, + StartTime: src.StartTime, + WorkspaceID: src.WorkspaceID, + AnnotationType: src.AnnotationType, + AnnotationIndex: annotationIndex, + Key: src.Key, + Value: src.Value, + Reasoning: src.Reasoning, + Corrections: corrections, + Metadata: src.Metadata, + Status: src.Status, + CreatedAt: src.CreatedAt, + CreatedBy: src.CreatedBy, + UpdatedAt: src.UpdatedAt, + UpdatedBy: src.UpdatedBy, + IsDeleted: src.IsDeleted, + } +} + +func copyMapStringString(src map[string]string) map[string]string { + if src == nil { + return nil + } + dst := make(map[string]string, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func copyMapStringInt64(src map[string]int64) map[string]int64 { + if src == nil { + return nil + } + dst := make(map[string]int64, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func copyMapStringFloat64(src map[string]float64) map[string]float64 { + if src == nil { + return nil + } + dst := make(map[string]float64, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} + +func copyMapStringBool(src map[string]bool) map[string]bool { + if src == nil { + return nil + } + dst := make(map[string]bool, len(src)) + for k, v := range src { + dst[k] = v + } + return dst +} diff --git a/backend/modules/observability/domain/trace/service/ingestion.go b/backend/modules/observability/domain/trace/service/ingestion.go index f439d6cfa..95395d28a 100644 --- a/backend/modules/observability/domain/trace/service/ingestion.go +++ b/backend/modules/observability/domain/trace/service/ingestion.go @@ -6,6 +6,7 @@ package service import ( "context" + "github.com/coze-dev/coze-loop/backend/infra/metrics" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/exporter" "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/processor" @@ -64,12 +65,16 @@ type IngestionServiceImpl struct { } func (i *IngestionServiceImpl) RunSync(ctx context.Context) error { - return i.c.RunInOne(ctx) + return i.c.RunInOne(ctx, func() error { + return nil + }) } func (i *IngestionServiceImpl) RunAsync(ctx context.Context) { go func() { - err := i.c.Run(ctx) + err := i.c.Run(ctx, func() error { + return nil + }) if err != nil { panic(err) } @@ -80,10 +85,12 @@ func (i *IngestionServiceImpl) RunAsync(ctx context.Context) { func NewIngestionServiceImpl( traceConfig conf.IConfigLoader, collectorFactory IngestionCollectorFactory, + consumeMetric metrics.Metric, ) (IngestionService, error) { c, err := collector.New(collector.Settings{ Factories: collectorFactory.GetCollectorFactory, ConfigProvider: collector.NewConfigProvider(traceConfig), + ConsumeMetric: consumeMetric, }) if err != nil { return nil, err diff --git a/backend/modules/observability/infra/metrics/metrics.go b/backend/modules/observability/infra/metrics/metrics.go index 1e81e0098..80b8b84c4 100644 --- a/backend/modules/observability/infra/metrics/metrics.go +++ b/backend/modules/observability/infra/metrics/metrics.go @@ -51,6 +51,9 @@ func traceQueryTagNames() []string { var ( traceMetricsOnce sync.Once singletonTraceMetrics metrics2.ITraceMetrics + + consumeMetricOnce sync.Once + singletonConsumeMetric metrics.Metric ) func NewTraceMetricsImpl(meter metrics.Meter) metrics2.ITraceMetrics { @@ -135,3 +138,35 @@ func (t *TraceMetricsImpl) EmitSendMetric(start time.Time, isError bool) { metrics.Counter(1, metrics.WithSuffix(metricSendSuffix+throughputSuffix)), metrics.Timer(time.Since(start).Microseconds(), metrics.WithSuffix(metricSendSuffix+latencySuffix))) } + +const ( + consumeMetricName = "trace_consume" + + ConsumeTagNode = "node" + ConsumeTagIsErr = "is_err" + ConsumeTagPSM = "psm" + ConsumeTagTenant = "tenant" + + ConsumeSuffixThroughput = "throughput" + ConsumeSuffixLatency = "latency" + ConsumeSuffixSpans = "spans" +) + +func NewConsumeMetric(meter metrics.Meter) metrics.Metric { + consumeMetricOnce.Do(func() { + if meter == nil { + return + } + m, err := meter.NewMetric( + consumeMetricName, + []metrics.MetricType{metrics.MetricTypeCounter, metrics.MetricTypeTimer}, + []string{ConsumeTagNode, ConsumeTagIsErr, ConsumeTagPSM, ConsumeTagTenant}, + ) + if err != nil { + logs.Error("Failed to create consume metric: %v", err) + return + } + singletonConsumeMetric = m + }) + return singletonConsumeMetric +}