Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 96 additions & 27 deletions scenarios/ebb_and_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"iter"
"math"
"math/rand"
"sync"
Expand Down Expand Up @@ -37,6 +38,18 @@ const (
MaxConsecutiveErrorsFlag = "max-consecutive-errors"
// BacklogLogIntervalFlag defines how often the current backlog stats are logged.
BacklogLogIntervalFlag = "backlog-log-interval"
// MinAddRateFlag defines the minimum add rate in activities/sec. When both
// min-add-rate and max-add-rate are set, the scenario operates in rate mode.
MinAddRateFlag = "min-add-rate"
// MaxAddRateFlag defines the maximum add rate in activities/sec.
MaxAddRateFlag = "max-add-rate"
// RatePeriodFlag defines the period of rate oscillation.
RatePeriodFlag = "rate-period"
// BacklogPeriodFlag defines the period of backlog oscillation. Defaults to
// the value of PeriodFlag for backwards compatibility.
BacklogPeriodFlag = "backlog-period"
// BatchSizeFlag defines the max activities per workflow (0 = unlimited).
BatchSizeFlag = "batch-size"
)

type ebbAndFlowConfig struct {
Expand All @@ -50,6 +63,13 @@ type ebbAndFlowConfig struct {
BacklogLogInterval time.Duration
VisibilityVerificationTimeout time.Duration
SleepActivityConfig *loadgen.SleepActivityConfig
// Rate mode: enabled when MinAddRate and MaxAddRate are both set.
RateMode bool
MinAddRate float64
MaxAddRate float64
RatePeriod time.Duration
BacklogPeriod time.Duration
BatchSize int64
}

type ebbAndFlowState struct {
Expand Down Expand Up @@ -80,6 +100,8 @@ func init() {
"Options:\n" +
" min-backlog, max-backlog, period, sleep-duration, max-rate,\n" +
" control-interval, max-consecutive-errors, backlog-log-interval.\n" +
"Rate mode (set min-add-rate and max-add-rate to enable):\n" +
" min-add-rate, max-add-rate, rate-period, backlog-period, batch-size.\n" +
"Duration must be set.",
ExecutorFn: func() loadgen.Executor { return newEbbAndFlowExecutor() },
})
Expand All @@ -90,30 +112,40 @@ func newEbbAndFlowExecutor() *ebbAndFlowExecutor {
}

func (e *ebbAndFlowExecutor) Configure(info loadgen.ScenarioInfo) error {
// TODO: backwards-compatibility, remove later
pt := info.ScenarioOptionDuration("phase-time", 60*time.Second)
period := info.ScenarioOptionDuration(PeriodFlag, pt)
if period <= 0 {
return fmt.Errorf("period must be greater than 0, got %v", period)
}

config := &ebbAndFlowConfig{
MinBacklog: int64(info.ScenarioOptionInt(MinBacklogFlag, 0)),
MaxBacklog: int64(info.ScenarioOptionInt(MaxBacklogFlag, 30)),
Period: period,
SleepDuration: info.ScenarioOptionDuration(SleepDurationFlag, 1*time.Millisecond),
MaxRate: int64(info.ScenarioOptionInt(MaxRateFlag, 1000)),
ControlInterval: info.ScenarioOptionDuration(ControlIntervalFlag, 100*time.Millisecond),
MaxConsecutiveErrors: info.ScenarioOptionInt(MaxConsecutiveErrorsFlag, 10),
BacklogLogInterval: info.ScenarioOptionDuration(BacklogLogIntervalFlag, 30*time.Second),
VisibilityVerificationTimeout: info.ScenarioOptionDuration(VisibilityVerificationTimeoutFlag, 30*time.Second),
MinAddRate: info.ScenarioOptionFloat(MinAddRateFlag, 0),
MaxAddRate: info.ScenarioOptionFloat(MaxAddRateFlag, 0),
RatePeriod: info.ScenarioOptionDuration(RatePeriodFlag, 10*time.Minute),
BacklogPeriod: info.ScenarioOptionDuration(BacklogPeriodFlag, period),
BatchSize: int64(info.ScenarioOptionInt(BatchSizeFlag, 0)),
}

config.MinBacklog = int64(info.ScenarioOptionInt(MinBacklogFlag, 0))
if config.MinBacklog < 0 {
return fmt.Errorf("min-backlog must be non-negative, got %d", config.MinBacklog)
}

config.MaxBacklog = int64(info.ScenarioOptionInt(MaxBacklogFlag, 30))
if config.MaxBacklog <= config.MinBacklog {
return fmt.Errorf("max-backlog must be greater than min-backlog, got max=%d min=%d", config.MaxBacklog, config.MinBacklog)
}

// TODO: backwards-compatibility, remove later
pt := info.ScenarioOptionDuration("phase-time", 60*time.Second)
config.Period = info.ScenarioOptionDuration(PeriodFlag, pt)
if config.Period <= 0 {
return fmt.Errorf("period must be greater than 0, got %v", config.Period)
config.RateMode = config.MinAddRate > 0 && config.MaxAddRate > 0
if config.RateMode && config.MaxAddRate <= config.MinAddRate {
return fmt.Errorf("max-add-rate must be greater than min-add-rate, got max=%v min=%v", config.MaxAddRate, config.MinAddRate)
}

if sleepActivitiesStr, ok := info.ScenarioOptions[SleepActivityJsonFlag]; ok {
Expand Down Expand Up @@ -169,10 +201,21 @@ func (e *ebbAndFlowExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo)
var startWG sync.WaitGroup
var iter int64 = 1

e.Logger.Infof("Starting ebb and flow scenario: min_backlog=%d, max_backlog=%d, period=%v, duration=%v",
config.MinBacklog, config.MaxBacklog, config.Period, e.Configuration.Duration)
if config.RateMode {
e.Logger.Infof("Starting ebb and flow scenario (rate mode): min_add_rate=%.1f, max_add_rate=%.1f, rate_period=%v, "+
"min_backlog=%d, max_backlog=%d, backlog_period=%v, batch_size=%d, duration=%v",
config.MinAddRate, config.MaxAddRate, config.RatePeriod,
config.MinBacklog, config.MaxBacklog, config.BacklogPeriod,
config.BatchSize, e.Configuration.Duration)
} else {
e.Logger.Infof("Starting ebb and flow scenario: min_backlog=%d, max_backlog=%d, period=%v, duration=%v",
config.MinBacklog, config.MaxBacklog, config.Period, e.Configuration.Duration)
}

e.RegisterDefaultSearchAttributes(ctx)

var started, completed, backlog, target, activities int64
var started, completed, backlog, target int64
lastIteration := time.Now()

for elapsed := time.Duration(0); elapsed < e.Configuration.Duration; elapsed = time.Since(e.startTime) {
select {
Expand All @@ -193,21 +236,29 @@ func (e *ebbAndFlowExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo)
completed = e.completedActivities.Load()
backlog = started - completed

target = calculateBacklogTarget(elapsed, config.Period, config.MinBacklog, config.MaxBacklog)
activities = target - backlog
activities = max(activities, 0)
activities = min(activities, config.MaxRate)
if activities > 0 {
startWG.Add(1)
go func(iter, activities int64) {
defer startWG.Done()
errCh <- e.spawnWorkflowWithActivities(ctx, iter, activities, config.SleepActivityConfig)
}(iter, activities)
backlogTarget := calculateSineTarget(
elapsed, config.BacklogPeriod, float64(config.MinBacklog), float64(config.MaxBacklog))
target = int64(math.Round(backlogTarget))
deficit := math.Max(0, backlogTarget-float64(backlog))

if config.RateMode {
rateTarget := calculateSineTarget(elapsed, config.RatePeriod, config.MinAddRate, config.MaxAddRate)
now := time.Now()
rateActivities := rateTarget * now.Sub(lastIteration).Seconds()
lastIteration = now
deficit = math.Max(deficit, rateActivities)
}
toStart := int64(math.Round(deficit))
toStart = min(toStart, config.MaxRate)
for batch := range batches(toStart, config.BatchSize) {
startWG.Go(func() {
errCh <- e.spawnWorkflowWithActivities(ctx, iter, batch, config.SleepActivityConfig)
})
iter++
}
case <-backlogTicker.C:
e.Logger.Debugf("Backlog: %d, target: %d, last iter: %d, started: %d, completed: %d",
backlog, target, activities, started, completed)
e.Logger.Infof("Backlog: %d, target: %d, started: %d, completed: %d",
backlog, target, started, completed)
}
}

Expand Down Expand Up @@ -281,7 +332,6 @@ func (e *ebbAndFlowExecutor) spawnWorkflowWithActivities(

// Start workflow.
run := e.NewRun(int(iteration))
e.RegisterDefaultSearchAttributes(ctx)
options := run.DefaultStartWorkflowOptions()
options.ID = fmt.Sprintf("%s-track-%d", e.id, iteration)
options.WorkflowExecutionErrorWhenAlreadyStarted = false
Expand Down Expand Up @@ -324,9 +374,28 @@ func calculateBacklogTarget(
elapsed, period time.Duration,
minBacklog, maxBacklog int64,
) int64 {
return int64(math.Round(calculateSineTarget(elapsed, period, float64(minBacklog), float64(maxBacklog))))
}

func calculateSineTarget(elapsed, period time.Duration, minVal, maxVal float64) float64 {
periods := elapsed.Seconds() / period.Seconds()
osc := (math.Sin(2*math.Pi*(periods-0.25)) + 1.0) / 2
backlogRange := float64(maxBacklog - minBacklog)
baseTarget := float64(minBacklog) + osc*backlogRange
return int64(math.Round(baseTarget))
return minVal + osc*(maxVal-minVal)
}

// batches yields batch sizes that partition total into chunks of at most
// batchSize. If batchSize <= 0, yields total as a single batch.
func batches(total, batchSize int64) iter.Seq[int64] {
return func(yield func(int64) bool) {
for total > 0 {
batch := total
if batchSize > 0 {
batch = min(batch, batchSize)
}
total -= batch
if !yield(batch) {
return
}
}
}
}
32 changes: 32 additions & 0 deletions scenarios/ebb_and_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,38 @@ import (
"github.com/temporalio/omes/workers"
)

func TestEbbAndFlowRateMode(t *testing.T) {
t.Parallel()

env := workers.SetupTestEnvironment(t,
workers.WithExecutorTimeout(2*time.Minute))

scenarioInfo := loadgen.ScenarioInfo{
RunID: fmt.Sprintf("eaf-rate-%d", time.Now().Unix()),
Configuration: loadgen.RunConfiguration{
Duration: 10 * time.Second,
},
ScenarioOptions: map[string]string{
MinAddRateFlag: "1",
MaxAddRateFlag: "3",
RatePeriodFlag: "5s",
MinBacklogFlag: "0",
MaxBacklogFlag: "3",
BacklogPeriodFlag: "5s",
PeriodFlag: "5s",
BacklogLogIntervalFlag: "5s",
VisibilityVerificationTimeoutFlag: "5s",
},
}

executor := newEbbAndFlowExecutor()
_, err := env.RunExecutorTest(t, executor, scenarioInfo, clioptions.LangGo)
require.NoError(t, err, "Rate mode executor should complete successfully")

state := executor.Snapshot().(ebbAndFlowState)
require.GreaterOrEqual(t, state.TotalCompletedWorkflows, int64(1))
}

func TestEbbAndFlow(t *testing.T) {
t.Parallel()

Expand Down
17 changes: 2 additions & 15 deletions workers/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error {
// The process metrics sidecar (with /info endpoint) is started by run.go, not the worker.
args = append(args, passthrough(r.ClientOptions.FlagSet(), "")...)
args = append(args, passthrough(r.LoggingOptions.FlagSet(), "")...)
args = append(args, passthroughExcluding(r.MetricsOptions.FlagSet("worker-"), "worker-", "process-metrics-address", "metrics-version-tag")...)
args = append(args, passthrough(r.MetricsOptions.FlagSet("worker-"), "worker-", "process-metrics-address", "metrics-version-tag")...)
args = append(args, passthrough(r.WorkerOptions.FlagSet(), "worker-")...)

cmd, err := prog.NewCommand(context.Background(), args...)
Expand Down Expand Up @@ -208,20 +208,7 @@ func (r *Runner) Run(ctx context.Context, baseDir string) error {
}
}

func passthrough(fs *pflag.FlagSet, prefix string) (flags []string) {
fs.VisitAll(func(f *pflag.Flag) {
if !f.Changed {
return
}
flags = append(flags, fmt.Sprintf("--%s=%s",
strings.TrimPrefix(f.Name, prefix),
f.Value.String(),
))
})
return
}

func passthroughExcluding(fs *pflag.FlagSet, prefix string, exclude ...string) (flags []string) {
func passthrough(fs *pflag.FlagSet, prefix string, exclude ...string) (flags []string) {
excludeSet := make(map[string]bool)
for _, e := range exclude {
excludeSet[e] = true
Expand Down
Loading