diff --git a/backend/modules/observability/application/openapi.go b/backend/modules/observability/application/openapi.go index 537a87ce2..5c56574f3 100644 --- a/backend/modules/observability/application/openapi.go +++ b/backend/modules/observability/application/openapi.go @@ -687,6 +687,14 @@ func (o *OpenAPIApplication) SearchTraceTreeOApi(ctx context.Context, req *opena } }() + if req != nil && req.GetStartTime() == 0 && req.GetEndTime() == 0 && o.timeRange != nil { + st, et := o.timeRange.GetTimeRange(ctx, strconv.FormatInt(req.GetWorkspaceID(), 10), "", req.GetTraceID(), 1000*60*60*24) + if st != nil && et != nil { + req.StartTime = st + req.EndTime = et + } + } + if err = o.validateSearchTraceTreeOApiReq(ctx, req); err != nil { errCode = obErrorx.CommercialCommonInvalidParamCodeCode return nil, err diff --git a/backend/modules/observability/domain/trace/service/trace_service.go b/backend/modules/observability/domain/trace/service/trace_service.go index e0423b40e..79112b584 100644 --- a/backend/modules/observability/domain/trace/service/trace_service.go +++ b/backend/modules/observability/domain/trace/service/trace_service.go @@ -60,6 +60,7 @@ type ListSpansReq struct { SelectColumns []string OmitColumns []string Scene entity.ProcessorScene + NotQueryAnnotation bool } type ListSpansResp struct { @@ -1116,16 +1117,17 @@ func (r *TraceServiceImpl) ListSpans(ctx context.Context, req *ListSpansReq) (*L } st := time.Now() tRes, err := r.traceRepo.ListSpans(ctx, &repo.ListSpansParam{ - WorkSpaceID: strconv.FormatInt(req.WorkspaceID, 10), - Tenants: tenants, - Filters: filters, - StartAt: req.StartTime, - EndAt: req.EndTime, - Limit: req.Limit, - DescByStartTime: req.DescByStartTime, - PageToken: req.PageToken, - SelectColumns: req.SelectColumns, - OmitColumns: req.OmitColumns, + WorkSpaceID: strconv.FormatInt(req.WorkspaceID, 10), + Tenants: tenants, + Filters: filters, + StartAt: req.StartTime, + EndAt: req.EndTime, + Limit: req.Limit, + DescByStartTime: req.DescByStartTime, + PageToken: req.PageToken, + SelectColumns: req.SelectColumns, + OmitColumns: req.OmitColumns, + NotQueryAnnotation: req.NotQueryAnnotation, }) r.metrics.EmitListSpans(req.WorkspaceID, string(req.SpanListType), st, err != nil) if err != nil { @@ -1499,7 +1501,8 @@ func (r *TraceServiceImpl) ListMetadata(ctx context.Context, req *ListMetadataRe "input", "output", }, - DescByStartTime: true, + DescByStartTime: true, + NotQueryAnnotation: true, }) if err != nil { return nil, err diff --git a/backend/modules/observability/domain/trace/service/trace_service_metadata_annotation_test.go b/backend/modules/observability/domain/trace/service/trace_service_metadata_annotation_test.go index 06f7c6030..fa2b6ffa0 100644 --- a/backend/modules/observability/domain/trace/service/trace_service_metadata_annotation_test.go +++ b/backend/modules/observability/domain/trace/service/trace_service_metadata_annotation_test.go @@ -55,6 +55,26 @@ func TestTraceServiceImpl_ListMetadata(t *testing.T) { PlatformType: loop_span.PlatformCozeLoop, } + t.Run("ListMetadata passes NotQueryAnnotation true to repo", func(t *testing.T) { + buildHelperMock.EXPECT().BuildPlatformRelatedFilter(gomock.Any(), req.PlatformType).Return(filterMock, nil) + filterMock.EXPECT().BuildBasicSpanFilter(gomock.Any(), gomock.Any()).Return(nil, true, nil) + tenantProviderMock.EXPECT().GetTenantsByPlatformType(gomock.Any(), req.PlatformType).Return([]string{"tenant1"}, nil) + + traceRepoMock.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) { + assert.True(t, param.NotQueryAnnotation) + return &repo.ListSpansResult{ + Spans: loop_span.SpanList{{SpanID: "span1"}}, + }, nil + }) + metricsMock.EXPECT().EmitListSpans(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()) + buildHelperMock.EXPECT().BuildListSpansProcessors(gomock.Any(), gomock.Any()).Return([]span_processor.Processor{}, nil) + + resp, err := svc.ListMetadata(ctx, req) + assert.NoError(t, err) + assert.NotNil(t, resp) + }) + t.Run("dedup by key and sort by frequency desc", func(t *testing.T) { buildHelperMock.EXPECT().BuildPlatformRelatedFilter(gomock.Any(), req.PlatformType).Return(filterMock, nil) filterMock.EXPECT().BuildBasicSpanFilter(gomock.Any(), gomock.Any()).Return(nil, true, nil) diff --git a/backend/modules/observability/domain/trace/service/trace_service_test.go b/backend/modules/observability/domain/trace/service/trace_service_test.go index b75949e51..7aaa5703a 100644 --- a/backend/modules/observability/domain/trace/service/trace_service_test.go +++ b/backend/modules/observability/domain/trace/service/trace_service_test.go @@ -1487,6 +1487,103 @@ func TestTraceServiceImpl_ListSpans(t *testing.T) { }, wantErr: true, }, + { + name: "list spans with NotQueryAnnotation passes flag to repo", + fieldsGetter: func(ctrl *gomock.Controller) fields { + repoMock := repomocks.NewMockITraceRepo(ctrl) + repoMock.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) { + assert.True(t, param.NotQueryAnnotation) + return &repo.ListSpansResult{ + Spans: loop_span.SpanList{{ + TraceID: "123", + SpanID: "234", + }}, + }, nil + }) + confMock := confmocks.NewMockITraceConfig(ctrl) + tenantProviderMock := tenantmocks.NewMockITenantProvider(ctrl) + tenantProviderMock.EXPECT().GetTenantsByPlatformType(gomock.Any(), gomock.Any()).Return([]string{"spans"}, nil).AnyTimes() + filterMock := filtermocks.NewMockFilter(ctrl) + filterMock.EXPECT().BuildBasicSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{{}}, false, nil) + filterMock.EXPECT().BuildALLSpanFilter(gomock.Any(), gomock.Any()).Return(nil, nil) + filterFactoryMock := filtermocks.NewMockPlatformFilterFactory(ctrl) + filterFactoryMock.EXPECT().GetFilter(gomock.Any(), gomock.Any()).Return(filterMock, nil) + buildHelper := NewTraceFilterProcessorBuilder(filterFactoryMock, map[entity.ProcessorScene][]span_processor.Factory{entity.SceneGetTrace: {}, entity.SceneListSpans: {}, entity.SceneAdvanceInfo: {}, entity.SceneIngestTrace: {}, entity.SceneSearchTraceOApi: {}, entity.SceneListSpansOApi: {}}) + metricsMock := metricmocks.NewMockITraceMetrics(ctrl) + metricsMock.EXPECT().EmitListSpans(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return() + return fields{ + traceRepo: repoMock, + traceConfig: confMock, + buildHelper: buildHelper, + metrics: metricsMock, + tenantProvider: tenantProviderMock, + } + }, + args: args{ + ctx: context.Background(), + req: &ListSpansReq{ + PlatformType: loop_span.PlatformCozeLoop, + Limit: 10, + SpanListType: loop_span.SpanListTypeAllSpan, + NotQueryAnnotation: true, + }, + }, + want: &ListSpansResp{ + Spans: loop_span.SpanList{{ + TraceID: "123", + SpanID: "234", + }}, + }, + }, + { + name: "list spans without NotQueryAnnotation defaults to false", + fieldsGetter: func(ctrl *gomock.Controller) fields { + repoMock := repomocks.NewMockITraceRepo(ctrl) + repoMock.EXPECT().ListSpans(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, param *repo.ListSpansParam) (*repo.ListSpansResult, error) { + assert.False(t, param.NotQueryAnnotation) + return &repo.ListSpansResult{ + Spans: loop_span.SpanList{{ + TraceID: "456", + SpanID: "789", + }}, + }, nil + }) + confMock := confmocks.NewMockITraceConfig(ctrl) + tenantProviderMock := tenantmocks.NewMockITenantProvider(ctrl) + tenantProviderMock.EXPECT().GetTenantsByPlatformType(gomock.Any(), gomock.Any()).Return([]string{"spans"}, nil).AnyTimes() + filterMock := filtermocks.NewMockFilter(ctrl) + filterMock.EXPECT().BuildBasicSpanFilter(gomock.Any(), gomock.Any()).Return([]*loop_span.FilterField{{}}, false, nil) + filterMock.EXPECT().BuildALLSpanFilter(gomock.Any(), gomock.Any()).Return(nil, nil) + filterFactoryMock := filtermocks.NewMockPlatformFilterFactory(ctrl) + filterFactoryMock.EXPECT().GetFilter(gomock.Any(), gomock.Any()).Return(filterMock, nil) + buildHelper := NewTraceFilterProcessorBuilder(filterFactoryMock, map[entity.ProcessorScene][]span_processor.Factory{entity.SceneGetTrace: {}, entity.SceneListSpans: {}, entity.SceneAdvanceInfo: {}, entity.SceneIngestTrace: {}, entity.SceneSearchTraceOApi: {}, entity.SceneListSpansOApi: {}}) + metricsMock := metricmocks.NewMockITraceMetrics(ctrl) + metricsMock.EXPECT().EmitListSpans(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return() + return fields{ + traceRepo: repoMock, + traceConfig: confMock, + buildHelper: buildHelper, + metrics: metricsMock, + tenantProvider: tenantProviderMock, + } + }, + args: args{ + ctx: context.Background(), + req: &ListSpansReq{ + PlatformType: loop_span.PlatformCozeLoop, + Limit: 10, + SpanListType: loop_span.SpanListTypeAllSpan, + }, + }, + want: &ListSpansResp{ + Spans: loop_span.SpanList{{ + TraceID: "456", + SpanID: "789", + }}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/backend/modules/observability/infra/repo/trace.go b/backend/modules/observability/infra/repo/trace.go index 2814d3a79..a9f3c293c 100644 --- a/backend/modules/observability/infra/repo/trace.go +++ b/backend/modules/observability/infra/repo/trace.go @@ -243,16 +243,17 @@ func (t *TraceRepoImpl) ListSpans(ctx context.Context, req *repo.ListSpansParam) } logs.CtxInfo(ctx, "list spans successfully, spans count %d, cost %v", len(spans), time.Since(st)) spanDOList := converter.SpanListPO2DO(spans) - if tableCfg.NeedQueryAnno && !req.NotQueryAnnotation { + if tableCfg.NeedQueryAnno && !req.NotQueryAnnotation && len(spans) > 0 { spanIDs := lo.UniqMap(spans, func(item *dao.Span, _ int) string { return item.SpanID }) + annoStartTime, annoEndTime := spanTimeRange(spans) st = time.Now() annotations, err := annoDao.List(ctx, &dao.ListAnnotationsParam{ Tables: tableCfg.AnnoTables, SpanIDs: spanIDs, - StartTime: time_util.MillSec2MicroSec(req.StartAt), - EndTime: time_util.MillSec2MicroSec(req.EndAt), + StartTime: annoStartTime, + EndTime: annoEndTime, Limit: int32(min(len(spanIDs)*100, 10000)), Extra: spanStorage.StorageConfig, }) @@ -391,16 +392,17 @@ func (t *TraceRepoImpl) GetTrace(ctx context.Context, req *repo.GetTraceParam) ( logs.CtxInfo(ctx, "get trace %s successfully, spans count %d, cost %v", req.TraceID, len(spans), time.Since(st)) spanDOList := converter.SpanListPO2DO(spans) - if tableCfg.NeedQueryAnno && !req.NotQueryAnnotation { + if tableCfg.NeedQueryAnno && !req.NotQueryAnnotation && len(spans) > 0 { spanIDs := lo.UniqMap(spans, func(item *dao.Span, _ int) string { return item.SpanID }) + annoStartTime, annoEndTime := spanTimeRange(spans) st = time.Now() annotations, err := annoDao.List(ctx, &dao.ListAnnotationsParam{ Tables: tableCfg.AnnoTables, SpanIDs: spanIDs, - StartTime: time_util.MillSec2MicroSec(req.StartAt), - EndTime: time_util.MillSec2MicroSec(req.EndAt), + StartTime: annoStartTime, + EndTime: annoEndTime, Limit: int32(min(len(spanIDs)*100, 10000)), Extra: spanStorage.StorageConfig, }) @@ -733,3 +735,17 @@ func parsePageToken(pageToken string) (*PageToken, error) { } return pt, nil } + +func spanTimeRange(spans []*dao.Span) (int64, int64) { + minStart := spans[0].StartTime + maxStart := spans[0].StartTime + for _, s := range spans[1:] { + if s.StartTime < minStart { + minStart = s.StartTime + } + if s.StartTime > maxStart { + maxStart = s.StartTime + } + } + return minStart, maxStart +} diff --git a/backend/modules/observability/infra/repo/trace_test.go b/backend/modules/observability/infra/repo/trace_test.go index d8b2e239e..d91cfd5c8 100644 --- a/backend/modules/observability/infra/repo/trace_test.go +++ b/backend/modules/observability/infra/repo/trace_test.go @@ -1781,3 +1781,244 @@ func TestTraceRepoImpl_ListWorkspaceAnnotations(t *testing.T) { }) } } + +func Test_spanTimeRange(t *testing.T) { + tests := []struct { + name string + spans []*dao.Span + wantStart int64 + wantEnd int64 + }{ + { + name: "single span", + spans: []*dao.Span{ + {StartTime: 1000}, + }, + wantStart: 1000, + wantEnd: 1000, + }, + { + name: "multiple spans ascending", + spans: []*dao.Span{ + {StartTime: 1000}, + {StartTime: 2000}, + {StartTime: 3000}, + }, + wantStart: 1000, + wantEnd: 3000, + }, + { + name: "multiple spans descending", + spans: []*dao.Span{ + {StartTime: 3000}, + {StartTime: 2000}, + {StartTime: 1000}, + }, + wantStart: 1000, + wantEnd: 3000, + }, + { + name: "multiple spans same time", + spans: []*dao.Span{ + {StartTime: 5000}, + {StartTime: 5000}, + }, + wantStart: 5000, + wantEnd: 5000, + }, + { + name: "multiple spans unordered", + spans: []*dao.Span{ + {StartTime: 3000}, + {StartTime: 1000}, + {StartTime: 5000}, + {StartTime: 2000}, + }, + wantStart: 1000, + wantEnd: 5000, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotStart, gotEnd := spanTimeRange(tt.spans) + assert.Equal(t, tt.wantStart, gotStart) + assert.Equal(t, tt.wantEnd, gotEnd) + }) + } +} + +func TestTraceRepoImpl_ListSpans_EmptySpans_SkipsAnnotationQuery(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + spansDaoMock := daomock.NewMockISpansDao(ctrl) + spansDaoMock.EXPECT().Get(gomock.Any(), gomock.Any()).Return([]*dao.Span{}, nil) + annoDaoMock := daomock.NewMockIAnnotationDao(ctrl) + traceConfigMock := confmocks.NewMockITraceConfig(ctrl) + traceConfigMock.EXPECT().GetTenantConfig(gomock.Any()).Return(&config.TenantCfg{ + TenantTables: map[string]map[loop_span.TTL]config.TableCfg{ + "test": { + loop_span.TTL3d: { + SpanTable: "spans", + AnnoTable: "annotations", + }, + }, + }, + TenantsSupportAnnotation: map[string]bool{ + "test": true, + }, + }, nil) + + r, err := NewTraceRepoImpl( + traceConfigMock, + &mockStorageProvider{}, + nil, nil, nil, nil, + WithTraceStorageDaos("ck", spansDaoMock, annoDaoMock), + ) + assert.NoError(t, err) + got, err := r.ListSpans(context.Background(), &repo.ListSpansParam{ + Tenants: []string{"test"}, + Limit: 10, + }) + assert.NoError(t, err) + assert.NotNil(t, got) + assert.Empty(t, got.Spans) +} + +func TestTraceRepoImpl_ListSpans_AnnotationUsesSpanTimeRange(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + spansDaoMock := daomock.NewMockISpansDao(ctrl) + spansDaoMock.EXPECT().Get(gomock.Any(), gomock.Any()).Return([]*dao.Span{ + {SpanID: "s1", StartTime: 5000}, + {SpanID: "s2", StartTime: 3000}, + {SpanID: "s3", StartTime: 8000}, + }, nil) + annoDaoMock := daomock.NewMockIAnnotationDao(ctrl) + annoDaoMock.EXPECT().List(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, param *dao.ListAnnotationsParam) ([]*dao.Annotation, error) { + assert.Equal(t, int64(3000), param.StartTime) + assert.Equal(t, int64(8000), param.EndTime) + return []*dao.Annotation{}, nil + }) + traceConfigMock := confmocks.NewMockITraceConfig(ctrl) + traceConfigMock.EXPECT().GetTenantConfig(gomock.Any()).Return(&config.TenantCfg{ + TenantTables: map[string]map[loop_span.TTL]config.TableCfg{ + "test": { + loop_span.TTL3d: { + SpanTable: "spans", + AnnoTable: "annotations", + }, + }, + }, + TenantsSupportAnnotation: map[string]bool{ + "test": true, + }, + }, nil) + + r, err := NewTraceRepoImpl( + traceConfigMock, + &mockStorageProvider{}, + nil, nil, nil, nil, + WithTraceStorageDaos("ck", spansDaoMock, annoDaoMock), + ) + assert.NoError(t, err) + got, err := r.ListSpans(context.Background(), &repo.ListSpansParam{ + Tenants: []string{"test"}, + Limit: 10, + StartAt: 1000, + EndAt: 100000, + }) + assert.NoError(t, err) + assert.NotNil(t, got) +} + +func TestTraceRepoImpl_GetTrace_EmptySpans_SkipsAnnotationQuery(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + spansDaoMock := daomock.NewMockISpansDao(ctrl) + spansDaoMock.EXPECT().Get(gomock.Any(), gomock.Any()).Return([]*dao.Span{}, nil) + annoDaoMock := daomock.NewMockIAnnotationDao(ctrl) + traceConfigMock := confmocks.NewMockITraceConfig(ctrl) + traceConfigMock.EXPECT().GetTenantConfig(gomock.Any()).Return(&config.TenantCfg{ + TenantTables: map[string]map[loop_span.TTL]config.TableCfg{ + "test": { + loop_span.TTL3d: { + SpanTable: "spans", + AnnoTable: "annotations", + }, + }, + }, + TenantsSupportAnnotation: map[string]bool{ + "test": true, + }, + }, nil) + + r, err := NewTraceRepoImpl( + traceConfigMock, + &mockStorageProvider{}, + nil, nil, nil, nil, + WithTraceStorageDaos("ck", spansDaoMock, annoDaoMock), + ) + assert.NoError(t, err) + got, err := r.GetTrace(context.Background(), &repo.GetTraceParam{ + TraceID: "trace1", + Tenants: []string{"test"}, + Limit: 1000, + }) + assert.NoError(t, err) + assert.NotNil(t, got) + assert.Empty(t, got.Spans) +} + +func TestTraceRepoImpl_GetTrace_AnnotationUsesSpanTimeRange(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + spansDaoMock := daomock.NewMockISpansDao(ctrl) + spansDaoMock.EXPECT().Get(gomock.Any(), gomock.Any()).Return([]*dao.Span{ + {SpanID: "s1", StartTime: 2000}, + {SpanID: "s2", StartTime: 9000}, + {SpanID: "s3", StartTime: 4000}, + }, nil) + annoDaoMock := daomock.NewMockIAnnotationDao(ctrl) + annoDaoMock.EXPECT().List(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, param *dao.ListAnnotationsParam) ([]*dao.Annotation, error) { + assert.Equal(t, int64(2000), param.StartTime) + assert.Equal(t, int64(9000), param.EndTime) + return []*dao.Annotation{}, nil + }) + traceConfigMock := confmocks.NewMockITraceConfig(ctrl) + traceConfigMock.EXPECT().GetTenantConfig(gomock.Any()).Return(&config.TenantCfg{ + TenantTables: map[string]map[loop_span.TTL]config.TableCfg{ + "test": { + loop_span.TTL3d: { + SpanTable: "spans", + AnnoTable: "annotations", + }, + }, + }, + TenantsSupportAnnotation: map[string]bool{ + "test": true, + }, + }, nil) + + r, err := NewTraceRepoImpl( + traceConfigMock, + &mockStorageProvider{}, + nil, nil, nil, nil, + WithTraceStorageDaos("ck", spansDaoMock, annoDaoMock), + ) + assert.NoError(t, err) + got, err := r.GetTrace(context.Background(), &repo.GetTraceParam{ + TraceID: "trace1", + Tenants: []string{"test"}, + Limit: 1000, + StartAt: 1000, + EndAt: 100000, + }) + assert.NoError(t, err) + assert.NotNil(t, got) +}