diff --git a/scenarios/ebb_and_flow.go b/scenarios/ebb_and_flow.go index 3ef39bef..51242e8b 100644 --- a/scenarios/ebb_and_flow.go +++ b/scenarios/ebb_and_flow.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "iter" "math" "math/rand" "sync" @@ -37,6 +38,18 @@ const ( MaxConsecutiveErrorsFlag = "max-consecutive-errors" // BacklogLogIntervalFlag defines how often the current backlog stats are logged. BacklogLogIntervalFlag = "backlog-log-interval" + // MinAddRateFlag defines the minimum add rate in activities/sec. When both + // min-add-rate and max-add-rate are set, the scenario operates in rate mode. + MinAddRateFlag = "min-add-rate" + // MaxAddRateFlag defines the maximum add rate in activities/sec. + MaxAddRateFlag = "max-add-rate" + // RatePeriodFlag defines the period of rate oscillation. + RatePeriodFlag = "rate-period" + // BacklogPeriodFlag defines the period of backlog oscillation. Defaults to + // the value of PeriodFlag for backwards compatibility. + BacklogPeriodFlag = "backlog-period" + // BatchSizeFlag defines the max activities per workflow (0 = unlimited). + BatchSizeFlag = "batch-size" ) type ebbAndFlowConfig struct { @@ -50,6 +63,13 @@ type ebbAndFlowConfig struct { BacklogLogInterval time.Duration VisibilityVerificationTimeout time.Duration SleepActivityConfig *loadgen.SleepActivityConfig + // Rate mode: enabled when MinAddRate and MaxAddRate are both set. + RateMode bool + MinAddRate float64 + MaxAddRate float64 + RatePeriod time.Duration + BacklogPeriod time.Duration + BatchSize int64 } type ebbAndFlowState struct { @@ -80,6 +100,8 @@ func init() { "Options:\n" + " min-backlog, max-backlog, period, sleep-duration, max-rate,\n" + " control-interval, max-consecutive-errors, backlog-log-interval.\n" + + "Rate mode (set min-add-rate and max-add-rate to enable):\n" + + " min-add-rate, max-add-rate, rate-period, backlog-period, batch-size.\n" + "Duration must be set.", ExecutorFn: func() loadgen.Executor { return newEbbAndFlowExecutor() }, }) @@ -90,30 +112,40 @@ func newEbbAndFlowExecutor() *ebbAndFlowExecutor { } func (e *ebbAndFlowExecutor) Configure(info loadgen.ScenarioInfo) error { + // TODO: backwards-compatibility, remove later + pt := info.ScenarioOptionDuration("phase-time", 60*time.Second) + period := info.ScenarioOptionDuration(PeriodFlag, pt) + if period <= 0 { + return fmt.Errorf("period must be greater than 0, got %v", period) + } + config := &ebbAndFlowConfig{ + MinBacklog: int64(info.ScenarioOptionInt(MinBacklogFlag, 0)), + MaxBacklog: int64(info.ScenarioOptionInt(MaxBacklogFlag, 30)), + Period: period, SleepDuration: info.ScenarioOptionDuration(SleepDurationFlag, 1*time.Millisecond), MaxRate: int64(info.ScenarioOptionInt(MaxRateFlag, 1000)), ControlInterval: info.ScenarioOptionDuration(ControlIntervalFlag, 100*time.Millisecond), MaxConsecutiveErrors: info.ScenarioOptionInt(MaxConsecutiveErrorsFlag, 10), BacklogLogInterval: info.ScenarioOptionDuration(BacklogLogIntervalFlag, 30*time.Second), VisibilityVerificationTimeout: info.ScenarioOptionDuration(VisibilityVerificationTimeoutFlag, 30*time.Second), + MinAddRate: info.ScenarioOptionFloat(MinAddRateFlag, 0), + MaxAddRate: info.ScenarioOptionFloat(MaxAddRateFlag, 0), + RatePeriod: info.ScenarioOptionDuration(RatePeriodFlag, 10*time.Minute), + BacklogPeriod: info.ScenarioOptionDuration(BacklogPeriodFlag, period), + BatchSize: int64(info.ScenarioOptionInt(BatchSizeFlag, 0)), } - config.MinBacklog = int64(info.ScenarioOptionInt(MinBacklogFlag, 0)) if config.MinBacklog < 0 { return fmt.Errorf("min-backlog must be non-negative, got %d", config.MinBacklog) } - - config.MaxBacklog = int64(info.ScenarioOptionInt(MaxBacklogFlag, 30)) if config.MaxBacklog <= config.MinBacklog { return fmt.Errorf("max-backlog must be greater than min-backlog, got max=%d min=%d", config.MaxBacklog, config.MinBacklog) } - // TODO: backwards-compatibility, remove later - pt := info.ScenarioOptionDuration("phase-time", 60*time.Second) - config.Period = info.ScenarioOptionDuration(PeriodFlag, pt) - if config.Period <= 0 { - return fmt.Errorf("period must be greater than 0, got %v", config.Period) + config.RateMode = config.MinAddRate > 0 && config.MaxAddRate > 0 + if config.RateMode && config.MaxAddRate <= config.MinAddRate { + return fmt.Errorf("max-add-rate must be greater than min-add-rate, got max=%v min=%v", config.MaxAddRate, config.MinAddRate) } if sleepActivitiesStr, ok := info.ScenarioOptions[SleepActivityJsonFlag]; ok { @@ -169,10 +201,21 @@ func (e *ebbAndFlowExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) var startWG sync.WaitGroup var iter int64 = 1 - e.Logger.Infof("Starting ebb and flow scenario: min_backlog=%d, max_backlog=%d, period=%v, duration=%v", - config.MinBacklog, config.MaxBacklog, config.Period, e.Configuration.Duration) + if config.RateMode { + e.Logger.Infof("Starting ebb and flow scenario (rate mode): min_add_rate=%.1f, max_add_rate=%.1f, rate_period=%v, "+ + "min_backlog=%d, max_backlog=%d, backlog_period=%v, batch_size=%d, duration=%v", + config.MinAddRate, config.MaxAddRate, config.RatePeriod, + config.MinBacklog, config.MaxBacklog, config.BacklogPeriod, + config.BatchSize, e.Configuration.Duration) + } else { + e.Logger.Infof("Starting ebb and flow scenario: min_backlog=%d, max_backlog=%d, period=%v, duration=%v", + config.MinBacklog, config.MaxBacklog, config.Period, e.Configuration.Duration) + } + + e.RegisterDefaultSearchAttributes(ctx) - var started, completed, backlog, target, activities int64 + var started, completed, backlog, target int64 + lastIteration := time.Now() for elapsed := time.Duration(0); elapsed < e.Configuration.Duration; elapsed = time.Since(e.startTime) { select { @@ -193,21 +236,29 @@ func (e *ebbAndFlowExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) completed = e.completedActivities.Load() backlog = started - completed - target = calculateBacklogTarget(elapsed, config.Period, config.MinBacklog, config.MaxBacklog) - activities = target - backlog - activities = max(activities, 0) - activities = min(activities, config.MaxRate) - if activities > 0 { - startWG.Add(1) - go func(iter, activities int64) { - defer startWG.Done() - errCh <- e.spawnWorkflowWithActivities(ctx, iter, activities, config.SleepActivityConfig) - }(iter, activities) + backlogTarget := calculateSineTarget( + elapsed, config.BacklogPeriod, float64(config.MinBacklog), float64(config.MaxBacklog)) + target = int64(math.Round(backlogTarget)) + deficit := math.Max(0, backlogTarget-float64(backlog)) + + if config.RateMode { + rateTarget := calculateSineTarget(elapsed, config.RatePeriod, config.MinAddRate, config.MaxAddRate) + now := time.Now() + rateActivities := rateTarget * now.Sub(lastIteration).Seconds() + lastIteration = now + deficit = math.Max(deficit, rateActivities) + } + toStart := int64(math.Round(deficit)) + toStart = min(toStart, config.MaxRate) + for batch := range batches(toStart, config.BatchSize) { + startWG.Go(func() { + errCh <- e.spawnWorkflowWithActivities(ctx, iter, batch, config.SleepActivityConfig) + }) iter++ } case <-backlogTicker.C: - e.Logger.Debugf("Backlog: %d, target: %d, last iter: %d, started: %d, completed: %d", - backlog, target, activities, started, completed) + e.Logger.Infof("Backlog: %d, target: %d, started: %d, completed: %d", + backlog, target, started, completed) } } @@ -281,7 +332,6 @@ func (e *ebbAndFlowExecutor) spawnWorkflowWithActivities( // Start workflow. run := e.NewRun(int(iteration)) - e.RegisterDefaultSearchAttributes(ctx) options := run.DefaultStartWorkflowOptions() options.ID = fmt.Sprintf("%s-track-%d", e.id, iteration) options.WorkflowExecutionErrorWhenAlreadyStarted = false @@ -324,9 +374,28 @@ func calculateBacklogTarget( elapsed, period time.Duration, minBacklog, maxBacklog int64, ) int64 { + return int64(math.Round(calculateSineTarget(elapsed, period, float64(minBacklog), float64(maxBacklog)))) +} + +func calculateSineTarget(elapsed, period time.Duration, minVal, maxVal float64) float64 { periods := elapsed.Seconds() / period.Seconds() osc := (math.Sin(2*math.Pi*(periods-0.25)) + 1.0) / 2 - backlogRange := float64(maxBacklog - minBacklog) - baseTarget := float64(minBacklog) + osc*backlogRange - return int64(math.Round(baseTarget)) + return minVal + osc*(maxVal-minVal) +} + +// batches yields batch sizes that partition total into chunks of at most +// batchSize. If batchSize <= 0, yields total as a single batch. +func batches(total, batchSize int64) iter.Seq[int64] { + return func(yield func(int64) bool) { + for total > 0 { + batch := total + if batchSize > 0 { + batch = min(batch, batchSize) + } + total -= batch + if !yield(batch) { + return + } + } + } } diff --git a/scenarios/ebb_and_flow_test.go b/scenarios/ebb_and_flow_test.go index 443925af..1485aff7 100644 --- a/scenarios/ebb_and_flow_test.go +++ b/scenarios/ebb_and_flow_test.go @@ -11,6 +11,38 @@ import ( "github.com/temporalio/omes/workers" ) +func TestEbbAndFlowRateMode(t *testing.T) { + t.Parallel() + + env := workers.SetupTestEnvironment(t, + workers.WithExecutorTimeout(2*time.Minute)) + + scenarioInfo := loadgen.ScenarioInfo{ + RunID: fmt.Sprintf("eaf-rate-%d", time.Now().Unix()), + Configuration: loadgen.RunConfiguration{ + Duration: 10 * time.Second, + }, + ScenarioOptions: map[string]string{ + MinAddRateFlag: "1", + MaxAddRateFlag: "3", + RatePeriodFlag: "5s", + MinBacklogFlag: "0", + MaxBacklogFlag: "3", + BacklogPeriodFlag: "5s", + PeriodFlag: "5s", + BacklogLogIntervalFlag: "5s", + VisibilityVerificationTimeoutFlag: "5s", + }, + } + + executor := newEbbAndFlowExecutor() + _, err := env.RunExecutorTest(t, executor, scenarioInfo, clioptions.LangGo) + require.NoError(t, err, "Rate mode executor should complete successfully") + + state := executor.Snapshot().(ebbAndFlowState) + require.GreaterOrEqual(t, state.TotalCompletedWorkflows, int64(1)) +} + func TestEbbAndFlow(t *testing.T) { t.Parallel() diff --git a/workers/run.go b/workers/run.go index 9ca84150..d6dd0eb4 100644 --- a/workers/run.go +++ b/workers/run.go @@ -130,7 +130,7 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { // The process metrics sidecar (with /info endpoint) is started by run.go, not the worker. args = append(args, passthrough(r.ClientOptions.FlagSet(), "")...) args = append(args, passthrough(r.LoggingOptions.FlagSet(), "")...) - args = append(args, passthroughExcluding(r.MetricsOptions.FlagSet("worker-"), "worker-", "process-metrics-address", "metrics-version-tag")...) + args = append(args, passthrough(r.MetricsOptions.FlagSet("worker-"), "worker-", "process-metrics-address", "metrics-version-tag")...) args = append(args, passthrough(r.WorkerOptions.FlagSet(), "worker-")...) cmd, err := prog.NewCommand(context.Background(), args...) @@ -208,20 +208,7 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error { } } -func passthrough(fs *pflag.FlagSet, prefix string) (flags []string) { - fs.VisitAll(func(f *pflag.Flag) { - if !f.Changed { - return - } - flags = append(flags, fmt.Sprintf("--%s=%s", - strings.TrimPrefix(f.Name, prefix), - f.Value.String(), - )) - }) - return -} - -func passthroughExcluding(fs *pflag.FlagSet, prefix string, exclude ...string) (flags []string) { +func passthrough(fs *pflag.FlagSet, prefix string, exclude ...string) (flags []string) { excludeSet := make(map[string]bool) for _, e := range exclude { excludeSet[e] = true