diff --git a/loadgen/visibility/types.go b/loadgen/visibility/types.go new file mode 100644 index 00000000..e4854451 --- /dev/null +++ b/loadgen/visibility/types.go @@ -0,0 +1,48 @@ +// Package visibility provides shared types for the visibility stress test scenario. +// +// These types are imported by both the scenario code (which builds workflow inputs) +// and the worker code (which executes the workflow). They live in loadgen/ rather than +// workers/go/ because the scenario (in the main Go module) cannot import from the +// workers/go module (separate Go module). +package visibility + +import "time" + +// VisibilityWorkerInput is the input for the visibilityStressWorker workflow. +// +// The executor builds one of these per workflow, encoding all the instructions for +// what the workflow should do: which CSAs to update, how long to sleep between updates, +// and whether to intentionally fail or timeout. +type VisibilityWorkerInput struct { + // Each element is one UpsertSearchAttributes call. + // May be empty if updatesPerWF rounds to 0 for this workflow. + CSAUpdates []CSAUpdateGroup `json:"csaUpdates"` + + // Duration to sleep between consecutive CSA updates within the workflow. + Delay time.Duration `json:"delay"` + + // If true, the workflow returns an ApplicationError after completing all CSA updates. + // This produces a "Failed" terminal status in the visibility store. + ShouldFail bool `json:"shouldFail"` + + // If true, the workflow sleeps for 24h after completing CSA updates. + // The executor sets a short execution timeout, so the workflow will be killed, + // producing a "TimedOut" terminal status in the visibility store. + ShouldTimeout bool `json:"shouldTimeout"` + + // TODO: Add memo update support + // MemoUpdates []MemoUpdate `json:"memoUpdates"` + // MemoSizeBytes int `json:"memoSizeBytes"` +} + +// CSAUpdateGroup represents one UpsertSearchAttributes call. +// +// Keys are CSA names (e.g., "VS_Int_01"), values are the values to set. +// The workflow dispatches on the name prefix (VS_Int_, VS_Keyword_, etc.) to +// determine the typed search attribute key. +// +// Note: JSON deserializes all numbers as float64, so the workflow must cast +// float64 → int64 for Int CSAs, and parse string → time.Time for Datetime CSAs. +type CSAUpdateGroup struct { + Attributes map[string]any `json:"attributes"` +} diff --git a/scenarios/visibility_stress.go b/scenarios/visibility_stress.go new file mode 100644 index 00000000..20b35455 --- /dev/null +++ b/scenarios/visibility_stress.go @@ -0,0 +1,1082 @@ +// Package scenarios +// Visibility stress test scenario. +// +// This scenario stress-tests the Temporal visibility store (Elasticsearch or SQL) by +// generating controlled read and write traffic. It uses a hybrid executor + workflow model: +// the executor spawns short-lived workflows with instructions baked into their input (no +// signals), and separate goroutines handle deletes and queries. +// +// Three independent presets control behavior: +// - loadPreset: write traffic (workflow starts, CSA updates, deletes, failure rates) +// - queryPreset: read traffic (List/Count queries with varying filter complexity) +// - csaPreset: which custom search attributes to register and use +package scenarios + +import ( + "context" + "fmt" + "math" + "math/rand" + "strings" + "sync/atomic" + "time" + + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/operatorservice/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "golang.org/x/time/rate" + "google.golang.org/protobuf/types/known/durationpb" + + "github.com/temporalio/omes/loadgen" + vstypes "github.com/temporalio/omes/loadgen/visibility" +) + +// --------------------------------------------------------------------------- +// Presets +// +// Three preset types control the scenario's behavior. Each can be selected +// via --option flags and overridden with individual --option key=value pairs. +// --------------------------------------------------------------------------- + +// vsLoadPreset controls write-side traffic: workflow creation rate, CSA update +// frequency, explicit deletion rate, and the distribution of terminal statuses. +type vsLoadPreset struct { + WfRPS float64 // Workflow starts per second. + UpdatesPerWF float64 // CSA upsert calls per workflow (can be fractional, e.g., 0.5). + UpdateDelay time.Duration // Sleep between consecutive CSA updates within a workflow. + DeleteRPS float64 // Explicit deletes per second (0 = rely on retention only). + FailPercent float64 // Fraction of WFs that intentionally fail (0.10 = 10%). + TimeoutPercent float64 // Fraction of WFs that intentionally timeout (0.05 = 5%). + // TODO: MemoUpdatesPerWF float64 + // TODO: MemoSizeBytes int +} + +// vsQueryPreset controls read-side traffic: query rate and the weighted distribution +// of query complexity (no-filter, open, closed, simple CSA, compound CSA). +type vsQueryPreset struct { + CountRPS float64 // CountWorkflowExecutions calls per second. + ListRPS float64 // ListWorkflowExecutions calls per second. + ListNoFilterWeight int // Weight for unfiltered list queries. + ListOpenWeight int // Weight for ExecutionStatus = 'Running' queries. + ListClosedWeight int // Weight for closed + time range queries. + ListSimpleCSAWeight int // Weight for single CSA filter queries. + ListCompoundCSAWeight int // Weight for multi-CSA compound filter queries. +} + +var vsLoadPresets = map[string]vsLoadPreset{ + "light": { + WfRPS: 10, UpdatesPerWF: 5, + UpdateDelay: 1 * time.Second, DeleteRPS: 2, + FailPercent: 0.10, TimeoutPercent: 0.05, + }, + "moderate": { + WfRPS: 100, UpdatesPerWF: 10, + UpdateDelay: 1 * time.Second, DeleteRPS: 20, + FailPercent: 0.10, TimeoutPercent: 0.05, + }, + "heavy": { + WfRPS: 1000, UpdatesPerWF: 20, + UpdateDelay: 500 * time.Millisecond, DeleteRPS: 200, + FailPercent: 0.10, TimeoutPercent: 0.05, + }, + "no-failures": { + WfRPS: 100, UpdatesPerWF: 10, + UpdateDelay: 1 * time.Second, DeleteRPS: 20, + FailPercent: 0, TimeoutPercent: 0, + }, +} + +var vsQueryPresets = map[string]vsQueryPreset{ + "light": { + CountRPS: 1, ListRPS: 2, + ListNoFilterWeight: 3, ListOpenWeight: 2, ListClosedWeight: 2, + ListSimpleCSAWeight: 2, ListCompoundCSAWeight: 1, + }, + "moderate": { + CountRPS: 5, ListRPS: 10, + ListNoFilterWeight: 3, ListOpenWeight: 2, ListClosedWeight: 2, + ListSimpleCSAWeight: 2, ListCompoundCSAWeight: 1, + }, + "heavy": { + CountRPS: 10, ListRPS: 25, + ListNoFilterWeight: 1, ListOpenWeight: 2, ListClosedWeight: 2, + ListSimpleCSAWeight: 3, ListCompoundCSAWeight: 2, + }, +} + +var vsCSAPresets = map[string][]string{ + "small": { + "VS_Int_01", "VS_Keyword_01", "VS_Bool_01", + "VS_Double_01", "VS_Text_01", "VS_Datetime_01", + }, + "medium": { + "VS_Int_01", "VS_Int_02", + "VS_Keyword_01", "VS_Keyword_02", + "VS_Bool_01", + "VS_Double_01", "VS_Double_02", + "VS_Text_01", + "VS_Datetime_01", "VS_Datetime_02", + }, + "heavy": { + "VS_Int_01", "VS_Int_02", "VS_Int_03", "VS_Int_04", "VS_Int_05", + "VS_Keyword_01", "VS_Keyword_02", "VS_Keyword_03", "VS_Keyword_04", "VS_Keyword_05", + "VS_Bool_01", "VS_Bool_02", + "VS_Double_01", "VS_Double_02", "VS_Double_03", + "VS_Text_01", "VS_Text_02", "VS_Text_03", + "VS_Datetime_01", "VS_Datetime_02", + }, +} + +var keywordVocabulary = []string{ + "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", + "india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa", + "quebec", "romeo", "sierra", "tango", "uniform", "victor", "whiskey", "xray", + "yankee", "zulu", "red", "blue", "green", "yellow", "orange", "purple", + "black", "white", "silver", "gold", "copper", "iron", "steel", "bronze", + "north", "south", "east", "west", "up", "down", "left", "right", "center", + "spring", "summer", "autumn", "winter", "dawn", "dusk", "noon", "midnight", + "rain", "snow", "wind", "storm", "cloud", "sun", "moon", "star", + "river", "lake", "ocean", "mountain", "valley", "forest", "desert", "island", + "apple", "cherry", "mango", "peach", "plum", "grape", "lemon", "melon", + "oak", "pine", "elm", "ash", "birch", "cedar", "maple", "willow", + "hawk", "wolf", "bear", "deer", "fox", "owl", "eagle", "lion", + "ruby", "jade", "opal", "onyx", +} + +// --------------------------------------------------------------------------- +// CSA Definition +// +// CSA (Custom Search Attribute) names follow the convention VS__, +// e.g., "VS_Int_01", "VS_Keyword_02". The type is inferred from the prefix, +// which is the same convention used by the workflow's type dispatch helper +// and the query generator. +// --------------------------------------------------------------------------- + +type csaType int + +const ( + csaTypeInt csaType = iota + csaTypeKeyword + csaTypeBool + csaTypeDouble + csaTypeText + csaTypeDatetime +) + +type csaDef struct { + Name string + Type csaType +} + +func parseCSAName(name string) (csaDef, error) { + switch { + case strings.HasPrefix(name, "VS_Int_"): + return csaDef{Name: name, Type: csaTypeInt}, nil + case strings.HasPrefix(name, "VS_Keyword_"): + return csaDef{Name: name, Type: csaTypeKeyword}, nil + case strings.HasPrefix(name, "VS_Bool_"): + return csaDef{Name: name, Type: csaTypeBool}, nil + case strings.HasPrefix(name, "VS_Double_"): + return csaDef{Name: name, Type: csaTypeDouble}, nil + case strings.HasPrefix(name, "VS_Text_"): + return csaDef{Name: name, Type: csaTypeText}, nil + case strings.HasPrefix(name, "VS_Datetime_"): + return csaDef{Name: name, Type: csaTypeDatetime}, nil + default: + return csaDef{}, fmt.Errorf("unrecognized CSA prefix in %q", name) + } +} + +func (c csaDef) indexedValueType() enums.IndexedValueType { + switch c.Type { + case csaTypeInt: + return enums.INDEXED_VALUE_TYPE_INT + case csaTypeKeyword: + return enums.INDEXED_VALUE_TYPE_KEYWORD + case csaTypeBool: + return enums.INDEXED_VALUE_TYPE_BOOL + case csaTypeDouble: + return enums.INDEXED_VALUE_TYPE_DOUBLE + case csaTypeText: + return enums.INDEXED_VALUE_TYPE_TEXT + case csaTypeDatetime: + return enums.INDEXED_VALUE_TYPE_DATETIME + default: + return enums.INDEXED_VALUE_TYPE_UNSPECIFIED + } +} + +func randomCSAValue(c csaDef, rng *rand.Rand) any { + switch c.Type { + case csaTypeInt: + return float64(rng.Intn(1001)) + case csaTypeKeyword: + return keywordVocabulary[rng.Intn(len(keywordVocabulary))] + case csaTypeBool: + return rng.Intn(2) == 1 + case csaTypeDouble: + return rng.Float64() * 1000.0 + case csaTypeText: + length := 10 + rng.Intn(41) + b := make([]byte, length) + const chars = "abcdefghijklmnopqrstuvwxyz0123456789 " + for i := range b { + b[i] = chars[rng.Intn(len(chars))] + } + return string(b) + case csaTypeDatetime: + offset := time.Duration(rng.Int63n(int64(30 * 24 * time.Hour))) + return time.Now().Add(-offset).UTC().Format(time.RFC3339) + default: + return nil + } +} + +// --------------------------------------------------------------------------- +// Config +// --------------------------------------------------------------------------- + +type vsConfig struct { + Load *vsLoadPreset + Query *vsQueryPreset + CSADefs []csaDef + NamespaceCount int + CreateNamespaces bool + Retention time.Duration + Cleanup bool + DeleteNamespaces bool +} + +// --------------------------------------------------------------------------- +// Executor +// --------------------------------------------------------------------------- + +type visibilityStressExecutor struct { + config *vsConfig + clients []client.Client + namespaces []string + taskQueue string + executionID string + rng *rand.Rand + + totalCreated atomic.Int64 + totalDeleted atomic.Int64 + totalQueries atomic.Int64 + totalErrors atomic.Int64 + wfCounter atomic.Uint64 +} + +var _ loadgen.Configurable = (*visibilityStressExecutor)(nil) + +func init() { + loadgen.MustRegisterScenario(loadgen.Scenario{ + Description: "Visibility store stress test.\n" + + "Options: loadPreset, queryPreset, csaPreset, namespaceCount, createNamespaces, retention, cleanup.\n" + + "Duration must be set. At least one of loadPreset or queryPreset required.", + ExecutorFn: func() loadgen.Executor { return &visibilityStressExecutor{} }, + }) +} + +// Configure parses and validates all scenario options (presets + overrides). +// Called before Run. In cleanup mode, preset validation is skipped. +func (e *visibilityStressExecutor) Configure(info loadgen.ScenarioInfo) error { + cfg := &vsConfig{ + NamespaceCount: info.ScenarioOptionInt("namespaceCount", 1), + CreateNamespaces: info.ScenarioOptionBool("createNamespaces", false), + Cleanup: info.ScenarioOptionBool("cleanup", false), + DeleteNamespaces: info.ScenarioOptionBool("deleteNamespaces", false), + } + + retentionStr := info.ScenarioOptionString("retention", "168h") + var err error + cfg.Retention, err = time.ParseDuration(retentionStr) + if err != nil { + return fmt.Errorf("invalid retention %q: %w", retentionStr, err) + } + if cfg.Retention < 24*time.Hour { + return fmt.Errorf("retention must be >= 24h, got %v", cfg.Retention) + } + if cfg.NamespaceCount < 1 { + return fmt.Errorf("namespaceCount must be >= 1, got %d", cfg.NamespaceCount) + } + + if cfg.Cleanup { + e.config = cfg + return nil + } + + if info.Configuration.Duration == 0 && info.Configuration.Iterations == 0 { + return fmt.Errorf("visibility_stress requires --duration") + } + if info.Configuration.Iterations > 0 { + return fmt.Errorf("visibility_stress does not support --iterations; use --duration") + } + + // Load preset. + if presetName, ok := info.ScenarioOptions["loadPreset"]; ok { + preset, found := vsLoadPresets[presetName] + if !found { + return fmt.Errorf("unknown loadPreset %q", presetName) + } + if v := info.ScenarioOptions["wfRPS"]; v != "" { + preset.WfRPS = info.ScenarioOptionFloat("wfRPS", preset.WfRPS) + } + if v := info.ScenarioOptions["updatesPerWF"]; v != "" { + preset.UpdatesPerWF = info.ScenarioOptionFloat("updatesPerWF", preset.UpdatesPerWF) + } + if v := info.ScenarioOptions["deleteRPS"]; v != "" { + preset.DeleteRPS = info.ScenarioOptionFloat("deleteRPS", preset.DeleteRPS) + } + if v := info.ScenarioOptions["failPercent"]; v != "" { + preset.FailPercent = info.ScenarioOptionFloat("failPercent", preset.FailPercent) + } + if v := info.ScenarioOptions["timeoutPercent"]; v != "" { + preset.TimeoutPercent = info.ScenarioOptionFloat("timeoutPercent", preset.TimeoutPercent) + } + if preset.FailPercent+preset.TimeoutPercent >= 1.0 { + return fmt.Errorf("failPercent + timeoutPercent must be < 1.0, got %.2f + %.2f", preset.FailPercent, preset.TimeoutPercent) + } + if preset.DeleteRPS < 0 { + return fmt.Errorf("deleteRPS must be non-negative") + } + if preset.WfRPS <= 0 { + return fmt.Errorf("wfRPS must be positive") + } + cfg.Load = &preset + } + + // Query preset. + if presetName, ok := info.ScenarioOptions["queryPreset"]; ok { + preset, found := vsQueryPresets[presetName] + if !found { + return fmt.Errorf("unknown queryPreset %q", presetName) + } + if v := info.ScenarioOptions["countRPS"]; v != "" { + preset.CountRPS = info.ScenarioOptionFloat("countRPS", preset.CountRPS) + } + if v := info.ScenarioOptions["listRPS"]; v != "" { + preset.ListRPS = info.ScenarioOptionFloat("listRPS", preset.ListRPS) + } + cfg.Query = &preset + } + + if cfg.Load == nil && cfg.Query == nil { + return fmt.Errorf("at least one of loadPreset or queryPreset must be set") + } + + // CSA preset. + csaPresetName := info.ScenarioOptionString("csaPreset", "") + if csaPresetName == "" { + if cfg.Load != nil { + csaPresetName = "medium" + } else { + return fmt.Errorf("csaPreset is required in read-only mode (no loadPreset)") + } + } + csaNames, found := vsCSAPresets[csaPresetName] + if !found { + return fmt.Errorf("unknown csaPreset %q", csaPresetName) + } + cfg.CSADefs = make([]csaDef, len(csaNames)) + for i, name := range csaNames { + cfg.CSADefs[i], err = parseCSAName(name) + if err != nil { + return err + } + } + + e.config = cfg + return nil +} + +// Run is the main entry point. It configures the executor, sets up namespaces and CSAs, +// then runs the steady-state phase (writer + deleter + querier goroutines) for --duration. +func (e *visibilityStressExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error { + if err := e.Configure(info); err != nil { + return fmt.Errorf("configuration error: %w", err) + } + + e.taskQueue = loadgen.TaskQueueForRun(info.RunID) + e.executionID = info.ExecutionID + e.rng = rand.New(rand.NewSource(time.Now().UnixNano())) + + // Resolve namespaces. + if e.config.NamespaceCount == 1 { + e.namespaces = []string{info.Namespace} + } else { + e.namespaces = make([]string, e.config.NamespaceCount) + for i := range e.namespaces { + e.namespaces[i] = fmt.Sprintf("vs-stress-%s-%d", info.RunID, i) + } + } + + // Setup namespaces (multi-NS only). + if e.config.NamespaceCount > 1 { + if err := e.setupNamespaces(ctx, info); err != nil { + return err + } + } + + // Dial clients. For single-NS, reuse info.Client. For multi-NS, we currently + // only support single-NS properly (multi-NS dialing needs connection params + // exposed via ScenarioInfo). + // TODO: Support multi-namespace client dialing by exposing ClientOptions in ScenarioInfo. + e.clients = make([]client.Client, len(e.namespaces)) + for i := range e.namespaces { + e.clients[i] = info.Client + } + + if e.config.Cleanup { + return e.runCleanup(ctx, info) + } + + if err := e.registerCSAs(ctx, info); err != nil { + return err + } + + // Wait for at least one worker to be polling each namespace's task queue + // before starting the steady-state phase. This lets users start the scenario + // before the workers (e.g., for multi-namespace where the scenario creates + // namespaces that workers need). + if e.config.Load != nil { + if err := e.waitForWorkers(ctx, info); err != nil { + return err + } + } + + e.logConfig(info) + return e.runSteadyState(ctx, info) +} + +// --------------------------------------------------------------------------- +// Setup +// --------------------------------------------------------------------------- + +func (e *visibilityStressExecutor) setupNamespaces(ctx context.Context, info loadgen.ScenarioInfo) error { + if !e.config.CreateNamespaces { + for _, ns := range e.namespaces { + _, err := info.Client.WorkflowService().DescribeNamespace(ctx, &workflowservice.DescribeNamespaceRequest{ + Namespace: ns, + }) + if err != nil { + return fmt.Errorf("namespace %s does not exist (pass createNamespaces=true to create): %w", ns, err) + } + } + return nil + } + + for _, ns := range e.namespaces { + _, err := info.Client.WorkflowService().RegisterNamespace(ctx, &workflowservice.RegisterNamespaceRequest{ + Namespace: ns, + WorkflowExecutionRetentionPeriod: durationpb.New(e.config.Retention), + }) + if err != nil && !strings.Contains(err.Error(), "already exists") { + return fmt.Errorf("failed to create namespace %s: %w", ns, err) + } + info.Logger.Infof("Namespace %s ready", ns) + } + return nil +} + +// registerCSAs registers all CSAs from the csaPreset on every namespace, then polls +// ListSearchAttributes until they're visible (propagation can take time on Elasticsearch). +// "Already exists" errors are treated as success (idempotent). +func (e *visibilityStressExecutor) registerCSAs(ctx context.Context, info loadgen.ScenarioInfo) error { + saMap := make(map[string]enums.IndexedValueType, len(e.config.CSADefs)) + for _, csa := range e.config.CSADefs { + saMap[csa.Name] = csa.indexedValueType() + } + + for i, ns := range e.namespaces { + _, err := e.clients[i].OperatorService().AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ + SearchAttributes: saMap, + Namespace: ns, + }) + if err != nil && !strings.Contains(err.Error(), "already exists") { + return fmt.Errorf("failed to register CSAs on namespace %s: %w", ns, err) + } + } + + // Poll until CSAs are visible. + deadline := time.Now().Add(30 * time.Second) + for time.Now().Before(deadline) { + allReady := true + for i, ns := range e.namespaces { + resp, err := e.clients[i].OperatorService().ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{ + Namespace: ns, + }) + if err != nil { + allReady = false + break + } + for _, csa := range e.config.CSADefs { + if _, ok := resp.CustomAttributes[csa.Name]; !ok { + allReady = false + break + } + } + if !allReady { + break + } + } + if allReady { + info.Logger.Infof("All %d CSAs propagated on %d namespace(s)", len(e.config.CSADefs), len(e.namespaces)) + return nil + } + time.Sleep(500 * time.Millisecond) + } + return fmt.Errorf("CSAs did not propagate within 30s") +} + +// waitForWorkers polls DescribeTaskQueue on each namespace until at least one workflow +// poller is detected. This allows the scenario to be started before workers (useful for +// multi-namespace where the scenario creates namespaces that workers need to connect to). +// Times out after 2 minutes. +func (e *visibilityStressExecutor) waitForWorkers(ctx context.Context, info loadgen.ScenarioInfo) error { + deadline := time.Now().Add(2 * time.Minute) + info.Logger.Infof("Waiting for workers to start polling task queue %s on %d namespace(s)...", e.taskQueue, len(e.namespaces)) + + for time.Now().Before(deadline) { + allReady := true + for i, ns := range e.namespaces { + resp, err := e.clients[i].DescribeTaskQueue(ctx, e.taskQueue, enums.TASK_QUEUE_TYPE_WORKFLOW) + if err != nil { + info.Logger.Debugf("DescribeTaskQueue failed for %s: %v", ns, err) + allReady = false + break + } + if len(resp.Pollers) == 0 { + allReady = false + break + } + } + if allReady { + info.Logger.Infof("Workers detected on all %d namespace(s)", len(e.namespaces)) + return nil + } + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting for workers") + case <-time.After(1 * time.Second): + } + } + return fmt.Errorf("timed out waiting for workers after 2m; ensure workers are running with --task-queue %s on each namespace", e.taskQueue) +} + +func (e *visibilityStressExecutor) logConfig(info loadgen.ScenarioInfo) { + var mode string + switch { + case e.config.Load != nil && e.config.Query != nil: + mode = "write+read" + case e.config.Load != nil: + mode = "write-only" + default: + mode = "read-only" + } + info.Logger.Infof("Mode: %s", mode) + info.Logger.Infof("Namespaces: %v", e.namespaces) + info.Logger.Infof("Task queue: %s", e.taskQueue) + info.Logger.Infof("CSAs: %d total", len(e.config.CSADefs)) + + if l := e.config.Load; l != nil { + info.Logger.Infof("Write: wfRPS=%.1f, updatesPerWF=%.1f, effective CSA update RPS≈%.0f, deleteRPS=%.1f", + l.WfRPS, l.UpdatesPerWF, l.WfRPS*l.UpdatesPerWF, l.DeleteRPS) + info.Logger.Infof(" failPercent=%.2f, timeoutPercent=%.2f, updateDelay=%v", + l.FailPercent, l.TimeoutPercent, l.UpdateDelay) + } + if q := e.config.Query; q != nil { + info.Logger.Infof("Read: countRPS=%.1f, listRPS=%.1f", q.CountRPS, q.ListRPS) + } +} + +// --------------------------------------------------------------------------- +// Steady-State +// --------------------------------------------------------------------------- + +func (e *visibilityStressExecutor) runSteadyState(ctx context.Context, info loadgen.ScenarioInfo) error { + ctx, cancel := context.WithTimeout(ctx, info.Configuration.Duration) + defer cancel() + + // Writer goroutine. + if e.config.Load != nil { + go e.runWriter(ctx, info) + } + + // Deleter goroutines. + if e.config.Load != nil && e.config.Load.DeleteRPS > 0 { + go e.runDeleters(ctx, info) + } + + // Querier goroutine. + if e.config.Query != nil { + go e.runQuerier(ctx, info) + } + + <-ctx.Done() + + // Wait for in-flight workflows to complete before returning. This is a pragmatic + // workaround: run-scenario-with-worker kills the worker as soon as Run() returns, + // but workflows started near the end of --duration haven't finished yet. Sleeping + // here delays Run()'s return, keeping the worker alive for the drain period. + // The writer/deleter/querier goroutines have already exited (their ctx is cancelled). + if e.config.Load != nil { + drainTime := time.Duration(math.Ceil(e.config.Load.UpdatesPerWF)) * e.config.Load.UpdateDelay + drainTime += 5 * time.Second // buffer for scheduling delays + info.Logger.Infof("Draining: waiting %v for in-flight workflows to complete...", drainTime) + time.Sleep(drainTime) + } + + info.Logger.Infof("Run complete. Created: %d, Deleted: %d, Queries: %d, Errors: %d", + e.totalCreated.Load(), e.totalDeleted.Load(), e.totalQueries.Load(), e.totalErrors.Load()) + return nil +} + +// runWriter starts workflows at the configured wfRPS rate. Each workflow is fire-and-forget: +// we don't wait for completion. The workflow input encodes all CSA update instructions, +// failure/timeout behavior, and delay between updates. +func (e *visibilityStressExecutor) runWriter(ctx context.Context, info loadgen.ScenarioInfo) { + limiter := rate.NewLimiter(rate.Limit(e.config.Load.WfRPS), 1) + var nsIndex int + startTime := time.Now() + var tickCount int64 + + for { + if err := limiter.Wait(ctx); err != nil { + return + } + + nsIdx := nsIndex % len(e.namespaces) + nsIndex++ + + input := e.buildWorkflowInput() + wfID := fmt.Sprintf("vs-%s-%s-%d", info.RunID, e.executionID, e.wfCounter.Add(1)) + + opts := client.StartWorkflowOptions{ + ID: wfID, + TaskQueue: e.taskQueue, + WorkflowExecutionTimeout: vsComputeTimeout(len(input.CSAUpdates)), + } + + // Fire and forget. + // TODO: better error handling (retry? circuit breaker?) + _, err := e.clients[nsIdx].ExecuteWorkflow(ctx, opts, "visibilityStressWorker", input) + if err != nil { + e.totalErrors.Add(1) + info.Logger.Warnf("Failed to start workflow: %v", err) + continue + } + + e.totalCreated.Add(1) + tickCount++ + + logEvery := int64(math.Max(1, e.config.Load.WfRPS)) + if tickCount%logEvery == 0 { + elapsed := time.Since(startTime) + info.Logger.Infof("[writer] t=%v created=%d errors=%d actual_rps=%.1f", + elapsed.Round(time.Second), e.totalCreated.Load(), + e.totalErrors.Load(), float64(e.totalCreated.Load())/elapsed.Seconds()) + } + } +} + +func (e *visibilityStressExecutor) runDeleters(ctx context.Context, info loadgen.ScenarioInfo) { + perNsDeleteRPS := e.config.Load.DeleteRPS / float64(len(e.namespaces)) + for i, ns := range e.namespaces { + i, ns := i, ns + go e.runDeleterForNamespace(ctx, info, i, ns, perNsDeleteRPS) + } +} + +// runDeleterForNamespace periodically lists terminal workflows (via the visibility store) +// and deletes them at the configured rate. +// The query is scoped by TaskQueue to avoid touching workflows from other runs. +func (e *visibilityStressExecutor) runDeleterForNamespace( + ctx context.Context, info loadgen.ScenarioInfo, + nsIdx int, ns string, deleteRPS float64, +) { + if deleteRPS <= 0 { + return + } + + tickInterval := time.Second + batchSize := int32(math.Max(1, deleteRPS)) + if deleteRPS < 1 { + tickInterval = time.Duration(float64(time.Second) / deleteRPS) + batchSize = 1 + } + + ticker := time.NewTicker(tickInterval) + defer ticker.Stop() + + query := fmt.Sprintf( + "WorkflowType = 'visibilityStressWorker' AND ExecutionStatus != 'Running' AND TaskQueue = '%s'", + e.taskQueue) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + resp, err := e.clients[nsIdx].ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: ns, + Query: query, + PageSize: batchSize, + }) + if err != nil { + // TODO: better error handling + info.Logger.Warnf("[deleter/%s] List failed: %v", ns, err) + continue + } + + tickDeleteErrors := 0 + for _, exec := range resp.Executions { + wfExec := exec.Execution + _, err := e.clients[nsIdx].WorkflowService().DeleteWorkflowExecution(ctx, + &workflowservice.DeleteWorkflowExecutionRequest{ + Namespace: ns, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: wfExec.WorkflowId, + RunId: wfExec.RunId, + }, + }) + if err != nil { + // TODO: better error handling + tickDeleteErrors++ + info.Logger.Warnf("[deleter/%s] Delete %s failed: %v", ns, wfExec.WorkflowId, err) + continue + } + e.totalDeleted.Add(1) + } + + info.Logger.Infof("[deleter/%s] deleted=%d errors=%d found=%d", + ns, e.totalDeleted.Load(), tickDeleteErrors, len(resp.Executions)) + } +} + +// runQuerier issues List and Count workflow queries at the configured RPS. +// Query type is chosen by weighted random from the queryPreset distribution. +// List queries fetch up to 3 pages (page 1, 2, 3) to exercise pagination. +func (e *visibilityStressExecutor) runQuerier(ctx context.Context, info loadgen.ScenarioInfo) { + q := e.config.Query + totalRPS := q.CountRPS + q.ListRPS + if totalRPS <= 0 { + return + } + + limiter := rate.NewLimiter(rate.Limit(totalRPS), 1) + var nsIndex int + startTime := time.Now() + var queryErrors atomic.Int64 + + totalWeight := q.ListNoFilterWeight + q.ListOpenWeight + q.ListClosedWeight + + q.ListSimpleCSAWeight + q.ListCompoundCSAWeight + + // Log every ~1 second worth of queries, with a minimum of every 1s. + logInterval := int64(math.Max(1, totalRPS)) + lastLogTime := time.Now() + + for { + if err := limiter.Wait(ctx); err != nil { + return + } + + nsIdx := nsIndex % len(e.namespaces) + nsIndex++ + ns := e.namespaces[nsIdx] + + isCount := e.rng.Float64() < q.CountRPS/totalRPS + filter := e.generateFilter(isCount, totalWeight) + + var queryType string + if isCount { + queryType = "count" + _, err := e.clients[nsIdx].CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{ + Namespace: ns, + Query: filter, + }) + if err != nil { + // TODO: better error handling + queryErrors.Add(1) + info.Logger.Warnf("[querier] Count failed (filter=%s): %v", filter, err) + } + } else { + queryType = "list" + resp, err := e.clients[nsIdx].ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: ns, + Query: filter, + }) + if err != nil { + // TODO: better error handling + queryErrors.Add(1) + info.Logger.Warnf("[querier] List failed (filter=%s): %v", filter, err) + } else if len(resp.NextPageToken) > 0 { + resp2, err := e.clients[nsIdx].ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: ns, + Query: filter, + NextPageToken: resp.NextPageToken, + }) + if err == nil && len(resp2.NextPageToken) > 0 { + _, _ = e.clients[nsIdx].ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: ns, + Query: filter, + NextPageToken: resp2.NextPageToken, + }) + } + } + } + _ = queryType // used in debug logging below + + total := e.totalQueries.Add(1) + + // Periodic logging. + if total%logInterval == 0 || time.Since(lastLogTime) >= 5*time.Second { + elapsed := time.Since(startTime) + info.Logger.Infof("[querier] t=%v queries=%d errors=%d actual_rps=%.1f last=%s filter=%s", + elapsed.Round(time.Second), total, queryErrors.Load(), + float64(total)/elapsed.Seconds(), queryType, filter) + lastLogTime = time.Now() + } + } +} + +// --------------------------------------------------------------------------- +// Query Generation +// --------------------------------------------------------------------------- + +func (e *visibilityStressExecutor) generateFilter(isCount bool, totalWeight int) string { + if isCount { + switch e.rng.Intn(3) { + case 0: + return "WorkflowType = 'visibilityStressWorker'" + case 1: + return "ExecutionStatus = 'Running'" + default: + kwCSAs := e.csasByType(csaTypeKeyword) + if len(kwCSAs) > 0 { + csa := kwCSAs[e.rng.Intn(len(kwCSAs))] + val := keywordVocabulary[e.rng.Intn(len(keywordVocabulary))] + return fmt.Sprintf("%s = '%s'", csa.Name, val) + } + return "WorkflowType = 'visibilityStressWorker'" + } + } + + q := e.config.Query + roll := e.rng.Intn(totalWeight) + cumulative := 0 + + cumulative += q.ListNoFilterWeight + if roll < cumulative { + return "WorkflowType = 'visibilityStressWorker'" + } + + cumulative += q.ListOpenWeight + if roll < cumulative { + return "WorkflowType = 'visibilityStressWorker' AND ExecutionStatus = 'Running'" + } + + cumulative += q.ListClosedWeight + if roll < cumulative { + since := time.Now().Add(-24 * time.Hour).UTC().Format(time.RFC3339) + return fmt.Sprintf("ExecutionStatus != 'Running' AND CloseTime > '%s'", since) + } + + cumulative += q.ListSimpleCSAWeight + if roll < cumulative { + return e.generateSimpleCSAFilter() + } + + return e.generateCompoundCSAFilter() +} + +func (e *visibilityStressExecutor) generateSimpleCSAFilter() string { + csa := e.config.CSADefs[e.rng.Intn(len(e.config.CSADefs))] + return e.csaFilterClause(csa) +} + +func (e *visibilityStressExecutor) generateCompoundCSAFilter() string { + n := 2 + e.rng.Intn(2) + if n > len(e.config.CSADefs) { + n = len(e.config.CSADefs) + } + perm := e.rng.Perm(len(e.config.CSADefs)) + clauses := make([]string, n) + for i := 0; i < n; i++ { + clauses[i] = e.csaFilterClause(e.config.CSADefs[perm[i]]) + } + return strings.Join(clauses, " AND ") +} + +func (e *visibilityStressExecutor) csaFilterClause(csa csaDef) string { + switch csa.Type { + case csaTypeInt: + v := e.rng.Intn(1001) + if e.rng.Intn(2) == 0 { + return fmt.Sprintf("%s > %d", csa.Name, v) + } + lo, hi := e.rng.Intn(500), 500+e.rng.Intn(501) + return fmt.Sprintf("%s > %d AND %s < %d", csa.Name, lo, csa.Name, hi) + case csaTypeKeyword: + val := keywordVocabulary[e.rng.Intn(len(keywordVocabulary))] + return fmt.Sprintf("%s = '%s'", csa.Name, val) + case csaTypeBool: + if e.rng.Intn(2) == 0 { + return fmt.Sprintf("%s = true", csa.Name) + } + return fmt.Sprintf("%s = false", csa.Name) + case csaTypeDouble: + v := e.rng.Float64() * 1000.0 + return fmt.Sprintf("%s > %f", csa.Name, v) + case csaTypeText: + val := keywordVocabulary[e.rng.Intn(len(keywordVocabulary))] + return fmt.Sprintf("%s = '%s'", csa.Name, val) + case csaTypeDatetime: + offset := time.Duration(e.rng.Int63n(int64(30 * 24 * time.Hour))) + t := time.Now().Add(-offset).UTC().Format(time.RFC3339) + return fmt.Sprintf("%s > '%s'", csa.Name, t) + default: + return "WorkflowType = 'visibilityStressWorker'" + } +} + +func (e *visibilityStressExecutor) csasByType(t csaType) []csaDef { + var result []csaDef + for _, csa := range e.config.CSADefs { + if csa.Type == t { + result = append(result, csa) + } + } + return result +} + +// --------------------------------------------------------------------------- +// Workflow Input Builder +// --------------------------------------------------------------------------- + +// buildWorkflowInput constructs a VisibilityWorkerInput for one workflow. +// It resolves fractional updatesPerWF probabilistically, rolls the failure/timeout +// dice, and builds randomized CSA update groups from the csaPreset. +func (e *visibilityStressExecutor) buildWorkflowInput() *vstypes.VisibilityWorkerInput { + cfg := e.config.Load + + wholeUpdates := int(cfg.UpdatesPerWF) + fraction := cfg.UpdatesPerWF - float64(wholeUpdates) + numUpdates := wholeUpdates + if e.rng.Float64() < fraction { + numUpdates++ + } + + roll := e.rng.Float64() + shouldFail := roll < cfg.FailPercent + shouldTimeout := !shouldFail && roll < cfg.FailPercent+cfg.TimeoutPercent + + groups := make([]vstypes.CSAUpdateGroup, 0, numUpdates) + shuffled := make([]csaDef, len(e.config.CSADefs)) + copy(shuffled, e.config.CSADefs) + e.rng.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) + + for i := 0; i < numUpdates; i++ { + groupSize := 1 + e.rng.Intn(3) + attrs := make(map[string]any, groupSize) + for j := 0; j < groupSize; j++ { + csa := shuffled[(i*3+j)%len(shuffled)] + attrs[csa.Name] = randomCSAValue(csa, e.rng) + } + groups = append(groups, vstypes.CSAUpdateGroup{Attributes: attrs}) + } + + return &vstypes.VisibilityWorkerInput{ + CSAUpdates: groups, + Delay: cfg.UpdateDelay, + ShouldFail: shouldFail, + ShouldTimeout: shouldTimeout, + } +} + +func vsComputeTimeout(numCSAUpdates int) time.Duration { + t := time.Duration(numCSAUpdates*2) * time.Second + if t < 5*time.Second { + t = 5 * time.Second + } + return t +} + +// --------------------------------------------------------------------------- +// Cleanup +// --------------------------------------------------------------------------- + +// runCleanup terminates all running workflows and deletes all workflows (terminal and running) +// for this scenario's task queue across all namespaces. Used with --option cleanup=true. +func (e *visibilityStressExecutor) runCleanup(ctx context.Context, info loadgen.ScenarioInfo) error { + info.Logger.Info("Running cleanup mode...") + + for i, ns := range e.namespaces { + info.Logger.Infof("Cleaning up namespace %s...", ns) + + // Terminate running workflows. + query := fmt.Sprintf( + "WorkflowType = 'visibilityStressWorker' AND ExecutionStatus = 'Running' AND TaskQueue = '%s'", + e.taskQueue) + for { + resp, err := e.clients[i].ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: ns, + Query: query, + PageSize: 100, + }) + if err != nil { + return fmt.Errorf("[cleanup/%s] list running failed: %w", ns, err) + } + if len(resp.Executions) == 0 { + break + } + for _, exec := range resp.Executions { + _ = e.clients[i].TerminateWorkflow(ctx, exec.Execution.WorkflowId, exec.Execution.RunId, "cleanup") + } + } + info.Logger.Infof("[cleanup/%s] Terminated all running workflows", ns) + + // Delete all workflows. + deleteQuery := fmt.Sprintf( + "WorkflowType = 'visibilityStressWorker' AND TaskQueue = '%s'", + e.taskQueue) + for { + resp, err := e.clients[i].ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: ns, + Query: deleteQuery, + PageSize: 100, + }) + if err != nil { + return fmt.Errorf("[cleanup/%s] list for deletion failed: %w", ns, err) + } + if len(resp.Executions) == 0 { + break + } + for _, exec := range resp.Executions { + wfExec := exec.Execution + _, err := e.clients[i].WorkflowService().DeleteWorkflowExecution(ctx, + &workflowservice.DeleteWorkflowExecutionRequest{ + Namespace: ns, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: wfExec.WorkflowId, + RunId: wfExec.RunId, + }, + }) + if err != nil { + info.Logger.Warnf("[cleanup/%s] delete %s failed: %v", ns, wfExec.WorkflowId, err) + } + } + } + info.Logger.Infof("[cleanup/%s] Deleted all workflows", ns) + } + + if e.config.DeleteNamespaces && e.config.NamespaceCount > 1 { + info.Logger.Warn("Namespace deletion not supported via API in all environments. Delete manually if needed.") + } + + info.Logger.Info("Cleanup complete.") + return nil +} diff --git a/scenarios/visibility_stress_test.go b/scenarios/visibility_stress_test.go new file mode 100644 index 00000000..a3ece132 --- /dev/null +++ b/scenarios/visibility_stress_test.go @@ -0,0 +1,317 @@ +package scenarios + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/temporalio/omes/cmd/clioptions" + "github.com/temporalio/omes/loadgen" + "github.com/temporalio/omes/workers" +) + +// TestVisibilityStressConfigure tests the configuration/validation logic without +// running any workflows. +func TestVisibilityStressConfigure(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + options map[string]string + config loadgen.RunConfiguration + expectError string + }{ + { + name: "no presets → error", + options: map[string]string{}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + expectError: "at least one of loadPreset or queryPreset must be set", + }, + { + name: "iterations not supported", + options: map[string]string{"loadPreset": "light"}, + config: loadgen.RunConfiguration{Iterations: 10}, + expectError: "does not support --iterations", + }, + { + name: "duration required", + options: map[string]string{"loadPreset": "light"}, + config: loadgen.RunConfiguration{}, + expectError: "requires --duration", + }, + { + name: "unknown loadPreset", + options: map[string]string{"loadPreset": "nonexistent"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + expectError: "unknown loadPreset", + }, + { + name: "unknown queryPreset", + options: map[string]string{"queryPreset": "nonexistent"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + expectError: "unknown queryPreset", + }, + { + name: "unknown csaPreset", + options: map[string]string{"loadPreset": "light", "csaPreset": "nonexistent"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + expectError: "unknown csaPreset", + }, + { + name: "read-only without csaPreset → error", + options: map[string]string{"queryPreset": "light"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + expectError: "csaPreset is required in read-only mode", + }, + { + name: "failPercent + timeoutPercent >= 1.0 → error", + options: map[string]string{"loadPreset": "light", "failPercent": "0.6", "timeoutPercent": "0.5"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + expectError: "failPercent + timeoutPercent must be < 1.0", + }, + { + name: "retention too short → error", + options: map[string]string{"loadPreset": "light", "retention": "1h"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + expectError: "retention must be >= 24h", + }, + { + name: "valid write-only config", + options: map[string]string{"loadPreset": "light"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + }, + { + name: "valid read-only config", + options: map[string]string{"queryPreset": "light", "csaPreset": "small"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + }, + { + name: "valid full config with overrides", + options: map[string]string{"loadPreset": "moderate", "queryPreset": "light", "csaPreset": "heavy", "wfRPS": "50", "deleteRPS": "0"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + }, + { + name: "cleanup mode skips preset validation", + options: map[string]string{"cleanup": "true"}, + config: loadgen.RunConfiguration{}, + }, + { + name: "loadPreset defaults csaPreset to medium", + options: map[string]string{"loadPreset": "light"}, + config: loadgen.RunConfiguration{Duration: 1 * time.Minute}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + executor := &visibilityStressExecutor{} + info := loadgen.ScenarioInfo{ + RunID: "test-run", + Configuration: tc.config, + ScenarioOptions: tc.options, + Namespace: "default", + } + err := executor.Configure(info) + if tc.expectError != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.expectError) + } else { + require.NoError(t, err) + } + }) + } +} + +// TestCSAParsing tests the CSA name prefix parsing logic. +func TestCSAParsing(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input string + expectType csaType + expectError bool + }{ + {"int", "VS_Int_01", csaTypeInt, false}, + {"keyword", "VS_Keyword_01", csaTypeKeyword, false}, + {"bool", "VS_Bool_01", csaTypeBool, false}, + {"double", "VS_Double_01", csaTypeDouble, false}, + {"text", "VS_Text_01", csaTypeText, false}, + {"datetime", "VS_Datetime_01", csaTypeDatetime, false}, + {"unknown prefix", "VS_Foo_01", 0, true}, + {"no prefix", "SomeAttribute", 0, true}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + def, err := parseCSAName(tc.input) + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, tc.expectType, def.Type) + assert.Equal(t, tc.input, def.Name) + } + }) + } +} + +// TestBuildWorkflowInput tests the workflow input builder logic. +func TestBuildWorkflowInput(t *testing.T) { + t.Parallel() + + executor := &visibilityStressExecutor{ + config: &vsConfig{ + Load: &vsLoadPreset{ + UpdatesPerWF: 3, + UpdateDelay: 100 * time.Millisecond, + FailPercent: 0, + TimeoutPercent: 0, + }, + CSADefs: []csaDef{ + {Name: "VS_Int_01", Type: csaTypeInt}, + {Name: "VS_Keyword_01", Type: csaTypeKeyword}, + {Name: "VS_Bool_01", Type: csaTypeBool}, + }, + }, + rng: rand.New(rand.NewSource(42)), + } + + input := executor.buildWorkflowInput() + + // With updatesPerWF=3 (no fractional part), we should get exactly 3 groups. + assert.Equal(t, 3, len(input.CSAUpdates)) + assert.Equal(t, 100*time.Millisecond, input.Delay) + assert.False(t, input.ShouldFail) + assert.False(t, input.ShouldTimeout) + + // Each group should have 1-3 attributes. + for _, group := range input.CSAUpdates { + assert.GreaterOrEqual(t, len(group.Attributes), 1) + assert.LessOrEqual(t, len(group.Attributes), 3) + } +} + +// TestBuildWorkflowInputFractional tests fractional updatesPerWF. +func TestBuildWorkflowInputFractional(t *testing.T) { + t.Parallel() + + executor := &visibilityStressExecutor{ + config: &vsConfig{ + Load: &vsLoadPreset{ + UpdatesPerWF: 0.5, // 50% get 1 update, 50% get 0 + UpdateDelay: time.Second, + FailPercent: 0, + TimeoutPercent: 0, + }, + CSADefs: []csaDef{ + {Name: "VS_Int_01", Type: csaTypeInt}, + }, + }, + rng: rand.New(rand.NewSource(42)), + } + + // Generate many inputs and check the distribution. + var withUpdates, withoutUpdates int + for i := 0; i < 1000; i++ { + input := executor.buildWorkflowInput() + if len(input.CSAUpdates) > 0 { + withUpdates++ + } else { + withoutUpdates++ + } + } + + // With 0.5, roughly half should have updates. Allow wide margin. + assert.InDelta(t, 500, withUpdates, 100, "expected ~50%% to have updates") + assert.InDelta(t, 500, withoutUpdates, 100, "expected ~50%% to have no updates") +} + +// TestQueryGeneration tests that generated queries reference only CSAs from the preset. +func TestQueryGeneration(t *testing.T) { + t.Parallel() + + executor := &visibilityStressExecutor{ + config: &vsConfig{ + Query: &vsQueryPreset{ + CountRPS: 1, + ListRPS: 1, + ListNoFilterWeight: 1, + ListOpenWeight: 1, + ListClosedWeight: 1, + ListSimpleCSAWeight: 5, // bias toward CSA filters + ListCompoundCSAWeight: 5, + }, + CSADefs: []csaDef{ + {Name: "VS_Int_01", Type: csaTypeInt}, + {Name: "VS_Keyword_01", Type: csaTypeKeyword}, + }, + }, + rng: rand.New(rand.NewSource(42)), + } + + totalWeight := 1 + 1 + 1 + 5 + 5 + + // Generate many queries and verify they only reference known CSAs. + for i := 0; i < 100; i++ { + filter := executor.generateFilter(false, totalWeight) + + // Should never reference VS_Int_02, VS_Keyword_02, etc. (not in preset). + assert.NotContains(t, filter, "VS_Int_02") + assert.NotContains(t, filter, "VS_Keyword_02") + assert.NotContains(t, filter, "VS_Bool_") + assert.NotContains(t, filter, "VS_Double_") + assert.NotContains(t, filter, "VS_Text_") + assert.NotContains(t, filter, "VS_Datetime_") + } +} + +// TestComputeTimeout tests the execution timeout calculation. +func TestComputeTimeout(t *testing.T) { + t.Parallel() + + assert.Equal(t, 5*time.Second, vsComputeTimeout(0)) + assert.Equal(t, 5*time.Second, vsComputeTimeout(1)) + assert.Equal(t, 5*time.Second, vsComputeTimeout(2)) + assert.Equal(t, 6*time.Second, vsComputeTimeout(3)) + assert.Equal(t, 20*time.Second, vsComputeTimeout(10)) +} + +// TestVisibilityStressWriteOnly runs the full scenario in write-only mode +// against a real dev server. This is the integration test. +func TestVisibilityStressWriteOnly(t *testing.T) { + t.Parallel() + + env := workers.SetupTestEnvironment(t, + workers.WithExecutorTimeout(1*time.Minute)) + + executor := &visibilityStressExecutor{} + scenarioInfo := loadgen.ScenarioInfo{ + RunID: fmt.Sprintf("vs-test-%d", time.Now().Unix()), + Configuration: loadgen.RunConfiguration{ + Duration: 5 * time.Second, + }, + ScenarioOptions: map[string]string{ + "loadPreset": "light", + "csaPreset": "small", + "wfRPS": "5", // low rate for test speed + "updatesPerWF": "2", // few updates per WF + "deleteRPS": "0", // no deletes (keep test simple) + "failPercent": "0", // no failures + "timeoutPercent": "0", // no timeouts + }, + } + + _, err := env.RunExecutorTest(t, executor, scenarioInfo, clioptions.LangGo) + require.NoError(t, err, "Executor should complete successfully") + + // Verify some workflows were created. + require.Greater(t, executor.totalCreated.Load(), int64(0), + "Should have created at least one workflow") + + t.Logf("Created %d workflows, errors: %d", + executor.totalCreated.Load(), executor.totalErrors.Load()) +} diff --git a/workers/go/visibility/workflow.go b/workers/go/visibility/workflow.go new file mode 100644 index 00000000..39a591d2 --- /dev/null +++ b/workers/go/visibility/workflow.go @@ -0,0 +1,140 @@ +// Package visibility implements the visibilityStressWorker workflow for the +// visibility stress test scenario. +// +// This workflow is intentionally simple: it receives a list of CSA update +// instructions, executes them with sleeps in between, then completes (or +// intentionally fails/times out). The executor controls all the complexity; +// the workflow just follows instructions. +package visibility + +import ( + "fmt" + "strings" + "time" + + vstypes "github.com/temporalio/omes/loadgen/visibility" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" +) + +// VisibilityStressWorkerWorkflow processes CSA update instructions from its input, +// then reaches a terminal state (completed, failed, or timed out). +// +// Each CSA update group triggers one UpsertTypedSearchAttributes call, with a +// configurable delay between updates. After all updates: +// - If ShouldTimeout: sleeps for 24h (execution timeout kills it → TimedOut status) +// - If ShouldFail: returns an ApplicationError → Failed status +// - Otherwise: returns nil → Completed status +func VisibilityStressWorkerWorkflow(ctx workflow.Context, input *vstypes.VisibilityWorkerInput) error { + if input == nil { + return nil + } + + // Execute each CSA update group sequentially with delays between them. + for i, group := range input.CSAUpdates { + typedAttrs, err := buildTypedSearchAttributes(group.Attributes) + if err != nil { + return fmt.Errorf("failed to build typed attributes for update %d: %w", i, err) + } + if err := workflow.UpsertTypedSearchAttributes(ctx, typedAttrs...); err != nil { + return fmt.Errorf("failed to upsert search attributes (update %d): %w", i, err) + } + // Sleep between updates (but not after the last one). + if i < len(input.CSAUpdates)-1 && input.Delay > 0 { + _ = workflow.Sleep(ctx, input.Delay) + } + } + + // Intentional timeout: sleep for 24h, which exceeds the execution timeout + // set by the executor. The server will terminate the workflow with TimedOut status. + if input.ShouldTimeout { + _ = workflow.Sleep(ctx, 24*time.Hour) + } + + // Intentional failure: return an application error to produce a Failed status. + if input.ShouldFail { + return temporal.NewApplicationError("intentional failure", "VS_INTENTIONAL") + } + + return nil +} + +// buildTypedSearchAttributes converts a raw map[string]any (from JSON deserialization) +// into typed search attribute updates that the Temporal SDK requires. +// +// The type of each CSA is inferred from its name prefix: +// - VS_Int_* → int64 (JSON float64 → int64 cast) +// - VS_Keyword_* → string +// - VS_Bool_* → bool +// - VS_Double_* → float64 +// - VS_Text_* → string (uses KeyString in Go SDK for Text SA type) +// - VS_Datetime_* → time.Time (parsed from RFC3339 string) +func buildTypedSearchAttributes(attrs map[string]any) ([]temporal.SearchAttributeUpdate, error) { + updates := make([]temporal.SearchAttributeUpdate, 0, len(attrs)) + for name, val := range attrs { + update, err := buildOneAttribute(name, val) + if err != nil { + return nil, fmt.Errorf("CSA %q: %w", name, err) + } + updates = append(updates, update) + } + return updates, nil +} + +// buildOneAttribute creates a single typed search attribute update by dispatching +// on the CSA name prefix. +func buildOneAttribute(name string, val any) (temporal.SearchAttributeUpdate, error) { + switch { + case strings.HasPrefix(name, "VS_Int_"): + // JSON numbers arrive as float64; cast to int64 for the Int SA type. + f, ok := val.(float64) + if !ok { + return nil, fmt.Errorf("expected float64 (JSON number) for Int CSA, got %T", val) + } + return temporal.NewSearchAttributeKeyInt64(name).ValueSet(int64(f)), nil + + case strings.HasPrefix(name, "VS_Keyword_"): + s, ok := val.(string) + if !ok { + return nil, fmt.Errorf("expected string for Keyword CSA, got %T", val) + } + return temporal.NewSearchAttributeKeyKeyword(name).ValueSet(s), nil + + case strings.HasPrefix(name, "VS_Bool_"): + b, ok := val.(bool) + if !ok { + return nil, fmt.Errorf("expected bool for Bool CSA, got %T", val) + } + return temporal.NewSearchAttributeKeyBool(name).ValueSet(b), nil + + case strings.HasPrefix(name, "VS_Double_"): + f, ok := val.(float64) + if !ok { + return nil, fmt.Errorf("expected float64 for Double CSA, got %T", val) + } + return temporal.NewSearchAttributeKeyFloat64(name).ValueSet(f), nil + + case strings.HasPrefix(name, "VS_Text_"): + // Go SDK uses NewSearchAttributeKeyString for Text SA type. + s, ok := val.(string) + if !ok { + return nil, fmt.Errorf("expected string for Text CSA, got %T", val) + } + return temporal.NewSearchAttributeKeyString(name).ValueSet(s), nil + + case strings.HasPrefix(name, "VS_Datetime_"): + // Datetime values are serialized as RFC3339 strings in JSON. + s, ok := val.(string) + if !ok { + return nil, fmt.Errorf("expected string (RFC3339) for Datetime CSA, got %T", val) + } + t, err := time.Parse(time.RFC3339, s) + if err != nil { + return nil, fmt.Errorf("failed to parse Datetime CSA value %q: %w", s, err) + } + return temporal.NewSearchAttributeKeyTime(name).ValueSet(t), nil + + default: + return nil, fmt.Errorf("unrecognized CSA prefix in %q", name) + } +} diff --git a/workers/go/worker/worker.go b/workers/go/worker/worker.go index 159b192b..c27905e4 100644 --- a/workers/go/worker/worker.go +++ b/workers/go/worker/worker.go @@ -9,6 +9,7 @@ import ( "github.com/temporalio/omes/workers/go/ebbandflow" "github.com/temporalio/omes/workers/go/kitchensink" "github.com/temporalio/omes/workers/go/schedulerstress" + "github.com/temporalio/omes/workers/go/visibility" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" @@ -110,6 +111,7 @@ func runWorkers(client client.Client, taskQueues []string, options clioptions.Wo w.RegisterWorkflow(kitchensink.WaitForCancelWorkflow) w.RegisterWorkflowWithOptions(ebbandflow.EbbAndFlowTrackWorkflow, workflow.RegisterOptions{Name: "ebbAndFlowTrack"}) w.RegisterActivity(&ebbFlowActivities) + w.RegisterWorkflowWithOptions(visibility.VisibilityStressWorkerWorkflow, workflow.RegisterOptions{Name: "visibilityStressWorker"}) w.RegisterWorkflowWithOptions(schedulerstress.NoopScheduledWorkflow, workflow.RegisterOptions{Name: "NoopScheduledWorkflow"}) w.RegisterWorkflowWithOptions(schedulerstress.SleepScheduledWorkflow, workflow.RegisterOptions{Name: "SleepScheduledWorkflow"}) w.RegisterNexusService(service)