diff --git a/module/executiondatasync/optimistic_sync/pipeline/pipeline2.go b/module/executiondatasync/optimistic_sync/pipeline/pipeline2.go new file mode 100644 index 00000000000..be26f1929cb --- /dev/null +++ b/module/executiondatasync/optimistic_sync/pipeline/pipeline2.go @@ -0,0 +1,445 @@ +package pipeline + +import ( + "context" + "fmt" + + "github.com/gammazero/workerpool" + "github.com/rs/zerolog" + "go.uber.org/atomic" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/executiondatasync/optimistic_sync" + "github.com/onflow/flow-go/module/irrecoverable" +) + +// Pipeline2 represents a pipelined state machine for processing a single ExecutionResult. +// +// The state machine is initialized in the Pending state and permits state transitions as specified by the diagram below. Intuitively, +// we have a sequence of tasks (Downloading, Indexing, Persisting) that must run one after another, each task being executed at most +// once. When the conditions have been satisfied for the next task to commence, the state machine atomically transitions to the next +// state, which signifies that the task is in progress, and the task is put into the queue of a worker pool for execution. For example, +// if the pipeline is in the Downloading state, it means that the downloading task is about to be executed (or already in progress). +// By making the state transition Pending → Downloading atomic, we guarantee that the Downloading Task is only ever scheduled once. +// Similar reasoning applies to the other tasks. +// +// Trigger condition: (1)▷┬┄┄┄┄┄┐ ┌┄┄┐ Trigger condition (3): +// parent is ┊ ╭∇─────────────────────────╮ ┊ ╭∇────────────────────────────╮ ┌┄(3)▷┄┬┄┄┄┄┐ sealed & parent completed +// Downloading ┊ │Task Downloading (once): │ ┊ │Task Indexing (once): │ ┊ ┊ ╭∇───────────────────────╮ +// or Indexing ┊ │1. download │ ┊ │1. build index from download │ ┊ ┊ │Task Persisting (once): │ +// or WaitingPersist ┊ │3. add Task Indexing (2)▷┄│┄┘ │3. check trigger (3) ▷?┄┄┄┄┄│┄┘ ┊ │- database write │ +// or Persisting ┊ │2. CAS state transition │ │2. CAS state transition │ ┊ │- CAS state transition │ +// or Complete ┊ │ downloading → indexing │ │ indexing → WaitingPersist │ ┊ │ Persisting → Complete │ +// ┊ │ ▷┄┄┄┄┄┄┐ │ │ ▷┄┐ │ ┊ │ ∇ │ +// ┏━━━━━━━━━━━━━━━━━━━┓ ┊ ┏━━━━━━━━━━━━━━━━┓ ┊ ┏━━━━━━━━━━━━━━━┓ ┊ ┏━━━━━━━━━━━━━━━━┓ ┊ ┏━━━━━━━━━━━━┓ ┊ ┏━━━━━━━━━━┓ +// ┃ state ┃ ┊ ┃ state ┃ ┊ ┃ state ┃ ┊ ┃ state ┃ ┊ ┃ state ┃ ┊ ┃ state ┃ +// ┃ Pending ┃──●─▶┃ Downloading ┃───────●─▶┃ Indexing ┃───●─▶┃ WaitingPersist ┃───●─▶┃ Persisting ┃─●─▶┃ Complete ┃ +// ┗━━━━━━━━━━━━┯━━━━━━┛ ┗━━━━━━━┯━━━━━━━━┛ ┗━━━━━━━━━━┯━━━━┛ ┗━━━━━━┯━━━━━━━━━┛ ^ ┗━━━━━━━━━━━━┛ ┗━━━━━━━━━━┛ +// ┃ ╰─────┃────────────────────╯ ╰─────┃──────────────────┃────╯ ^ ╰────────────────────────╯ +// ┃ ┃ ┃ ┃ ^ +// ┃ ┃ ┏━━━━━━━━━━━━━━━┓ ┃ ┃ This state transition only happens +// ┗━━━━━━━━━━━━━━━━━━━━┻━━━━━━━▶ Abandoned ◀━━━━━━┻━━━━━━━━━━━━━━━━━━┛ after this result is sealed. Hence, +// ┗━━━━━━━━━━━━━━━┛ the result can no longer be abandoned +// +// Note that the order of items in Tasks Downloading and Indexing has been swapped for ease of visualization (order 1, 3, 2). The term +// 'CAS' refers to an atomic Compare-And-Swap operation on the pipeline's state. The CAS operation (step 2) acts as a gatekeeper +// ensuring that only a single routine can successfully apply the state transition. The task (step 3) is scheduled by a go-routine only +// after this routine successfully performed the state transition. Thereby, we guarantee that a task is only ever scheduled once. +// All computationally heavy or blocking work (Tasks Downloading, Indexing, Persisting encapsulated in [Core]) is delegated to worker +// pools. By utilizing CAS operations, we eliminate the need for locking the pipeline. Therefore, the pipeline's public methods have +// computationally negligible latency, effectively returning almost immediately. Hence, the pipeline can use the calling goroutine +// to perform its lifecycle updates (no need for internal goroutines beyond the injected worker pools), which improves concurrency +// and simplifies the correctness prove for the implementation significantly. +// +// Trigger condition (1), permitting state transition Pending → Downloading: +// ▷ Parent must be in state Downloading or Indexing or WaitingPersist or Persisting or Complete. +// This can be implement as listening to the `OnParentStateUpdated` notification. +// (assuming no state change was missed, while pipeline is being added to forest) +// +// Trigger condition (2), permitting state transition Downloading → Indexing: +// ▷ The Downloading Task completed successfully. +// As we have a single routine executing the downloading task, this routine should always +// be able to perform the state transition, barring the processing being abandoned. +// +// Trigger condition (3), permitting state transition WaitingPersist → Persisting: +// ▷ Pipeline must represent a sealed result _and_ parent's state must be Complete. +// This can be implement by listening to the notifications `OnParentStateUpdated` and `SetSealed`. Whichever notification delivers the +// last piece of required information also transitions updates performs the state transition WaitingPersist → Persisting (atomically, +// via CAS) and schedules the `Persisting` task. +// +// REQUIREMENTS for the PIPELINE implementation: +// (I) All tasks are executed at most once. +// (II) Trigger conditions being satisfied must not be missed. +// +// The only exceptions to rule (II) are: +// - Triggers can be missed if the pipeline reaches the Abandoned state. In this case, no further processing is desired and work +// already done can be discarded. Hence, interim triggers for processing work can be ignored. +// - The trigger to abandon processing can be missed (with low probability). In this case, we will do extra work but eventually +// the result will be garbage collected. Doing more work optimistically to improve latency than minimally necessary is acceptable. +// +// REQUIREMENTS for the HIGHER-level BUSINESS LOGIC instantiating and orchestrating Pipelines: +// In a concurrent environment, there might be a "blind period" between a `Pipeline2` instance being created and it being subscribed to +// the information sources emitting `OnParentStateUpdated` and `SetSealed` notifications. Notifications concurrently emitted during such +// "blind period" might violate requirement (II). Design patters for the higher-level business logic to AVOID a "BLIND PERIODS" are: +// - Atomic instantiation-and-subscription: +// The `Pipeline2` instance is created and subscribed to the notification sources within a single atomic operation. The result's own +// sealing status as well as the status of the parent result do not change during this atomic operation. +// - Instantiate-and-subscribe + Catchup: +// After successful instantiating a Pipeline2 object and subscribing it to `OnParentStateUpdated` plus `SetSealed`, the higher-level +// business logic does a second iteration over the result's sealing status and the parent result's status. It reads the status from +// the _sources directly_, not relying on notifications. This newest information about the most up-to-date status is then forwarded +// to the `Pipeline2` instance. When implementing this approach, we must follow an information-driven design: +// https://www.notion.so/flowfoundation/Reactive-State-Management-in-Distributed-Systems-A-Guide-to-Eventually-Consistent-Designs-22d1aee1232480b2ad05e95a7c48a32d +// Formally, you might think of the notifications as sets (of information), where the set relations defines a partial order: +// Pending ⊂ Downloading ⊂ Indexing ⊂ WaitingPersist ⊂ Complete +// (Pending ∪ Downloading ∪ Indexing ∪ WaitingPersist) ⊂ Abandoned +// This implies: +// (a) Pipeline2 must be idempotent with respect to `OnParentStateUpdated` and `SetSealed` invocations. +// (b) Pipeline2 must recognize old information as such, which is already reflected in its internal state, and ignore it. For example, +// the Pipeline being in the state `Indexing` should be understood as "we should download the data and index it once we have it". +// Then being informed (old information) that we have progressed to a point where we should be downloading the data, will be a no-op, +// because we already know that (and more, namely that we should be indexing the data in addition once we have downloaded it). +// (c) Pipeline2 must recognize, when notifications deliver newer information but skipp some intermediate state transitions. Pipeline2 +// can't just silently discard that information. Either, we return a sentinel error, signalling to the caller that they need to re- +// deliver missing interim notifications. Or (approach implemented) Pipeline2 advances the state internally to be up-to-date with the +// newest information it received. This can happen as a race condition in a concurrent environment: for example, assume that Pipeline2 +// is in the "Pending" state. It missed the notification that its parent has progressed to `Indexing`. While the higher-level business +// logic is just about the deliver the newer information, the parent transitions to Complete and emits a notification that arrives first. +// +// In summary, ALL PUBLIC METHODS of Pipeline2 are CONCURRENCY safe, IDEMPOTENT, and tolerate information to arrive OUT-OF-ORDER. As long as +// notifications eventually arrive, the pipeline will reach the same state as if all notifications are delivered in their canonical order. +// Specifically, the pipeline can consume information from multiple independent sources concurrently, without requiring any external ordering +// or synchronization. For example, one information source might already be telling us about the parent's most recent new state, while a +// second information source could still be informing us about interim states that the parent has already passed through. +// This design has major benefits: the higher-level business logic can use independent algorithmic paths concurrently to update and +// orchestrate pipelines. This is the most robust design, where the caller has to know the least about the internal state of the pipeline +// and its parent. The caller can just feed it with all information they have (and receive) - the pipeline will incorporate it correctly. +// In a nutshell, the sole requirement a pipeline has on its input notifications / information is that they are explainable by +// valid state machine evolutions. +type Pipeline2 struct { + log zerolog.Logger + stateConsumer optimistic_sync.PipelineStateConsumer + core optimistic_sync.Core + + downloadingWorkerPool *workerpool.WorkerPool + indexingWorkerPool *workerpool.WorkerPool + persistingWorkerPool *workerpool.WorkerPool + + rootContext context.Context // from the component managing the Results Forest and all its pipelines + // -> child context (aka "pipeline context") with cancel -> store pipeline context and the cancel in the pipeline here + // when submitting a task to a workerpool, use the pipeline context + + signalerCtx irrecoverable.SignalerContext // the escalate irrecoverable errors to the higher-level business logic + + // The following fields are accessed externally. they are stored using atomics to avoid + // blocking the caller. + state *optimistic_sync.State2Tracker // current state of the pipeline + parentStateTracker *optimistic_sync.State2Tracker // latest known state of the parent result; eventually consistent: might lag behind actual parent state! + + // TODO: remove the following fields, if they are no longer used (?) + + stateChangedNotifier engine.Notifier // deprecated: to be removed + worker *worker // deprecated: to be removed + isAbandoned *atomic.Bool // deprecated: to be removed + isIndexed *atomic.Bool // deprecated: to be removed + + // TODO: to be moved to Core ? ¯\_(ツ)_/¯ + isSealed *atomic.Bool // deprecated +} + +var _ optimistic_sync.Pipeline = (*Pipeline2)(nil) + +// NewPipeline2 creates a new processing pipeline. +// The pipeline is initialized in the Pending state. +func NewPipeline2( + log zerolog.Logger, + downloadingWorkerPool *workerpool.WorkerPool, + indexingWorkerPool *workerpool.WorkerPool, + persistingWorkerPool *workerpool.WorkerPool, + executionResult *flow.ExecutionResult, + stateReceiver optimistic_sync.PipelineStateConsumer, +) (*Pipeline2, error) { + log = log.With(). + Str("component", "pipeline"). + Str("execution_result_id", executionResult.ExecutionDataID.String()). + Str("block_id", executionResult.BlockID.String()). + Logger() + + myState, err := optimistic_sync.NewState2Tracker(optimistic_sync.State2Pending) + if err != nil { + return nil, fmt.Errorf("could not create pipeline state tracker: %w", err) + } + parentState, err := optimistic_sync.NewState2Tracker(optimistic_sync.State2Pending) + if err != nil { + return nil, fmt.Errorf("could not create pipeline state tracker: %w", err) + } + + return &Pipeline2{ + log: log, + stateConsumer: stateReceiver, + worker: newWorker(), + state: myState, + parentStateTracker: parentState, + isSealed: atomic.NewBool(false), + isAbandoned: atomic.NewBool(false), + isIndexed: atomic.NewBool(false), + stateChangedNotifier: engine.NewNotifier(), + downloadingWorkerPool: downloadingWorkerPool, + indexingWorkerPool: indexingWorkerPool, + persistingWorkerPool: persistingWorkerPool, + }, nil +} + +func (p *Pipeline2) Run(ctx context.Context, core optimistic_sync.Core, parentState optimistic_sync.State) error { + panic("Run is deprecated, implementation now utilizes `State2Tracker` for atomicity and workerpools for any work with non-vanishing runtime") +} + +// checkAbandoned returns true if the pipeline or its parent are abandoned. +func (p *Pipeline2) checkAbandoned() bool { + if p.isAbandoned.Load() { + return true + } + if p.parentState() == optimistic_sync.StateAbandoned { + return true + } + return p.GetState() == optimistic_sync.StateAbandoned +} + +// GetState returns the current state of the pipeline. +func (p *Pipeline2) GetState() optimistic_sync.State { + return optimistic_sync.State(p.state.Value()) +} + +// SetSealed marks the execution result as sealed. +// This will cause the pipeline to eventually transition to the StateComplete state when the parent finishes processing. +func (p *Pipeline2) SetSealed() { + // Note: do not use a mutex here to avoid blocking the results forest. + if p.isSealed.CompareAndSwap(false, true) { + p.stateChangedNotifier.Notify() + } +} + +// OnParentStateUpdated atomically updates the pipeline's based on the provided information about the parent state. +// This function follows an information-driven, eventually consistent design: +// - `parentState` tells us that the parent result has progressed to _at least_ that state +// (it might be further ahead, and we just haven't received information about it). +// - Idempotency: redundant (older) information is handled gracefully. +// - New information skipping intermediate states is handled gracefully. +// +// We assume that new information about the parent state changing can arrive through different algorithmic paths concurrently. +// This means that one information source might already be telling us about the parent's most recent new state, while a second +// information source could still be informing us about interim states that the parent has already passed through. This is the +// most robust design, where the caller has to know the least about the internal state of the pipeline and its parent. The +// caller can just feed us with all information they have about the parent's state, and the pipeline will incorporate it correctly. +func (p *Pipeline2) OnParentStateUpdated(parentState optimistic_sync.State2) { + // Important: We assume that information (from different data sources) might arrive out of order or could be redundant. + // We utilize the atomic `State2Tracker` to optimistically attempt evolving the state. If the state's tracker accepts, we know + // exactly what state evolution has taken place (could be multiple state transitions at once, if we haven't heard about some + // intermediate state transitions yet). If the state's tracker rejects the update, we retrospectively analyze what whether + // this rejection was expected or is a symptom of a bug or state corruption (in the latter case safe continuation is impossible). + + oldParentState, parentSuccess := p.parentStateTracker.Evolve(parentState) + if !parentSuccess { + panic("to be implemented") + } + // reaching the code below means we have successfully evolved the parent state's tracker from `oldParentState` to `parentState` + + if oldParentState == parentState { + return // redundant information, no state change + } + + // (II) We assume that notifications about the parent state changing are delivered eventually (no notifications are missed). Despite + // notifications arriving out of order or being late, we still will eventually learn that the parent has transitioned out of the pending + // state. Hence, condition (II) is satisfied if the parent is not abandoned. However, if the parent is abandoned, this pipeline is by + // definition also abandoned and the trigger can be missed (see exceptions to requirement (II) in the Pipeline2 struct specification). + p.checkTrigger1(parentState) + + // TODO: explain why requirement (II) is satisfied for Trigger Condition (3) + p.checkTrigger3(parentState) + +} + +// checkTrigger1 checks Trigger Condition (1): parent is in `Downloading` or `Indexing` or `Persisting` `WaitingPersist` or `Complete`. +// If this pipeline is in state `Pending` and (1) is satisfied, we transition to `Downloading` and schedule the downloading task. +// +// Concurrency safe, idempotent and tolerates information to arrive out of order about the parent state changing. +// This function guarantees that requirement (I) is satisfied for trigger condition (1). +// However, the caller must ensure that requirement (II) is satisfied. +func (p *Pipeline2) checkTrigger1(parentState optimistic_sync.State2) { + // Check Trigger Condition (1): we want to avoid erroneously continuing in case of bugs and broken future refactorings. Instead of specifying + // for which states we do not want to trigger, we explicitly specify for which states we do want to trigger. Thereby, broken future refactorings + // are more likely to produce liveness failures instead of incorrectly continuing processing and producing incorrect results. + trigger1Satisfied := parentState == optimistic_sync.State2Downloading || parentState == optimistic_sync.State2Indexing || + parentState == optimistic_sync.State2WaitingPersist || parentState == optimistic_sync.State2Persisting || + parentState == optimistic_sync.State2Complete + if !trigger1Satisfied { + return + } + + // If and only if Pipeline is in `Pending` state, we atomically transition to `Downloading`. In a second (non-atomic) step, we schedule + // the downloading task. We must show that this satisfies requirement (I): + // (I) We attempt to atomically transition the pipeline state from `Pending` to `Downloading` with an atomic CompareAndSwap operation. At + // most one go-routine can apply this operation successfully and proceed to schedule the downloading task, which satisfies requirement (I). + _, success := p.state.CompareAndSwap(optimistic_sync.State2Pending, optimistic_sync.State2Downloading) + if !success { + // some other thread transitioned the pipeline out of Pending state already and should have scheduled the downloading task + return // noting more to do + } + p.stateConsumer.OnStateUpdated(optimistic_sync.State2Downloading) + p.downloadingWorkerPool.Submit(p.downloadTask) +} + +// downloadTask packages the work of downloading the execution result data (encapsulated in [Core.Download]) +// with the lifecycle updates of the pipeline's state machine. Once the call returns from Core without error, +// the downloading work is complete, and we transition the pipeline state from `Downloading` to `Indexing`. +// This should _always_ succeed as the downloading task is the only place allowed to perform this state transition. +// +// TODO: +// Unexpected errors returned from [Core.Download] are escalated as irrecoverable exceptions to the higher-level +// business logic via the SignalerContext. +// +// CAUTION: not idempotent! According to Pipeline2 struct specification, this task is only executed once. +// The caller is responsible for satisfying requirements (I) and (II) for the downloading task (see +// Pipeline2 struct specification for details). +func (p *Pipeline2) downloadTask() { + panic("update core to allow the following commented-out code: ") + err := p.core.Download(ctx) + if err != nil { + TODO Expected error returns during normal operations: + - context.Canceled: when the context is canceled + } + + // Downloading Task finished successful. Atomically transition pipeline state to Indexing. In a second (non-atomic) step, we schedule + // the indexing task. We must show that this satisfies requirement (I) and (II): + // (I) We attempt to atomically transition the pipeline state from `Downloading` to `Indexing` with an atomic CompareAndSwap operation. At + // most one go-routine can apply this operation successfully and proceed to schedule the indexing task, which satisfies requirement (I). + // (II) Assuming that the algorithm up to this point is correct, trigger conditions are not missed for the downloading task to start. + // Hence, once the downloading task is complete, we can always proceed to schedule the indexing task, as no additional trigger conditions + // are required. Hence, condition (II) is satisfied also for the indexing task. + _, success := p.state.CompareAndSwap(optimistic_sync.State2Downloading, optimistic_sync.State2Indexing) + if !success { + panic("this state transition must always succeed") + // TODO: emit irrecoverable here and escalate it upwards to the higher-level engine + } + p.stateConsumer.OnStateUpdated(optimistic_sync.State2Indexing) + p.indexingWorkerPool.Submit(p.indexingTask) +} + +// indexingTask packages the work of indexing the execution result (encapsulated in [Core.Index]) +// with the lifecycle updates of the pipeline's state machine. Once the call returns from Core without error, +// the Index work is complete, and we transition the pipeline state from `Indexing` to `WaitingPersist`. This +// should _always_ succeed as the indexing task is the only place allowed to perform this state transition. +// +// TODO: +// Unexpected errors returned from [Core.Index] are escalated as irrecoverable exceptions to the higher-level +// business logic via the SignalerContext. +// +// CAUTION: not idempotent! According to Pipeline2 struct specification, this task is only executed once. +// The caller is responsible for satisfying requirements (I) and (II) for the indexing task (see +// Pipeline2 struct specification for details). +func (p *Pipeline2) indexingTask() { + err := p.core.Index() + if err != nil { + // TODO Expected error returns during normal operations: + // - context.Canceled: when the context is canceled + } + + // Indexing Task finished successful. Atomically transition pipeline state to WaitingPersist: + _, success := p.state.CompareAndSwap(optimistic_sync.State2Indexing, optimistic_sync.State2WaitingPersist) + if !success { + panic("this state transition must always succeed") + // TODO: emit irrecoverable here and escalate it upwards to the higher-level engine + } + p.stateConsumer.OnStateUpdated(optimistic_sync.State2WaitingPersist) + + // Check Trigger Condition (3): + p.checkTrigger3(p.parentStateTracker.Value()) +} + +// checkTrigger3 checks Trigger Condition (3): this result is sealed and the processing the parent is Complete. +// If this pipeline is in state `WaitingPersist` and (3) is satisfied, we transition to state `Persisting` and schedule the Persisting task. +// +// Concurrency safe, idempotent and tolerates information to arrive out of order about the parent state changing. +// This function guarantees that requirement (I) is satisfied for trigger condition (1). +// However, the caller must ensure that requirement (II) is satisfied. +func (p *Pipeline2) checkTrigger3(parentState optimistic_sync.State2) { + trigger3Satisfied := parentState == optimistic_sync.State2Complete && p.IsSealed() + if !trigger3Satisfied { + return + } + + // If and only if Pipeline is in `WaitingPersist` state, we atomically transition to `Persisting`. In a second (non-atomic) step, we schedule + // the persisting task. We must show that this satisfies requirement (I): + // (I) We attempt to atomically transition the pipeline state from `WaitingPersist` to `Persisting` with an atomic CompareAndSwap operation. + // At most one go-routine can apply this operation successfully and proceed to schedule the Persisting task, which satisfies requirement (I). + _, success := p.state.CompareAndSwap(optimistic_sync.State2WaitingPersist, optimistic_sync.State2Persisting) + if !success { + // * We might not have reached `WaitingPersist` yet. In this case, the indexing task will check Trigger Condition (3) again once it is done. + // * We might have already transitioned from `WaitingPersist` to `Persisting`. In this case, the persisting task has already been scheduled. + // * The only remaining possibility is that the pipeline is Abandoned. + return // In all cases, there is nothing more to do here. + } + p.stateConsumer.OnStateUpdated(optimistic_sync.State2Persisting) + p.indexingWorkerPool.Submit(p.persistingTask) +} + +// persistingTask packages the work of saving the indexed execution result to the database (encapsulated in +// [Core.Persist]) with the lifecycle updates of the pipeline's state machine. Once the call returns from Core without +// error, the persisting work is complete, and we transition the pipeline state from `Persisting` to `Complete`. This +// should _always_ succeed as the persisting task is the only place allowed to perform this state transition. +// +// TODO: +// Unexpected errors returned from [Core.Persist] are escalated as irrecoverable exceptions to the higher-level +// business logic via the SignalerContext. +// +// CAUTION: not idempotent! According to Pipeline2 struct specification, this task is only executed once. +// The caller is responsible for satisfying requirements (I) and (II) for the indexing task (see +// Pipeline2 struct specification for details). +func (p *Pipeline2) persistingTask() { + err := p.core.Persist() + if err != nil { + // TODO Are there any expected error returns here? Maybe (?): + // - context.Canceled: when the context is canceled (e.g. during graceful shutdown). + // We could include it for uniformity here, even though the implementation might never return this sentinel. + } + + // Persisting Task finished successful. Atomically transition pipeline state to Complete: + _, success := p.state.CompareAndSwap(optimistic_sync.State2Persisting, optimistic_sync.State2Complete) + if !success { + panic("this state transition must always succeed") + // TODO: emit irrecoverable here and escalate it upwards to the higher-level engine + } + p.stateConsumer.OnStateUpdated(optimistic_sync.State2Complete) +} + +// IsSealed returns true if and only if the execution result has been marked as sealed. +func (p *Pipeline2) IsSealed() bool { + panic("implement me") +} + +// Abandon leaves the pipeline in the abandoned state. +// The sole requirement a pipeline has on its input notifications / information is that they are explainable +// by valid state machine evolutions. This method is concurrency safe, idempotent and tolerates information +// to arrive out of order. In general, the state evolution to `Abandoned` should always succeed. The returned +// error [ErrInvalidStateEvolutionEpoch] should be considered in most cases a symptom of a bug or state +// corruption, indicating that the provided inputs are not explainable by valid state machine evolution. In +// this case, a safe continuation is impossible. +// +// No error returns expected during normal operations. +func (p *Pipeline2) Abandon() error { + priorState, success := p.state.Set(optimistic_sync.State2Abandoned) // idempotent, as our state machine is reflexive + if !success { // none of the following should be compatible with a valid state evolution; + return fmt.Errorf("state evolution %s to %s attempted, with current sealing status of %t: %w", priorState, optimistic_sync.State2Abandoned, p.IsSealed(), ErrInvalidStateEvolution) + } + if priorState != optimistic_sync.State2Abandoned { + // we successfully applied the state transition for the first time. Subsequent calls will _not_ execute this block. + // TODO: + panic("try to cancel pending jobs if there are any? maybe with signaller context, which we also use for critical errors for the workers?") + } + return nil +} + +// ErrInvalidStateEvolution indicates that the provided inputs are not explainable by valid state machine evolution. +// In most cases, this error is a symptom of a bug or state corruption, making safe continuation impossible. +var ErrInvalidStateEvolution = fmt.Errorf("provided inputs are not explainable by valid state machine evolution") diff --git a/module/executiondatasync/optimistic_sync/state.go b/module/executiondatasync/optimistic_sync/state.go index 435c08bad8c..9b14bff1484 100644 --- a/module/executiondatasync/optimistic_sync/state.go +++ b/module/executiondatasync/optimistic_sync/state.go @@ -1,5 +1,12 @@ package optimistic_sync +import ( + "errors" + "fmt" + + "go.uber.org/atomic" +) + // State represents the state of the processing pipeline type State int32 @@ -38,3 +45,229 @@ func (s State) String() string { func (s State) IsTerminal() bool { return s == StateComplete || s == StateAbandoned } + +// ErrInvalidPipelineState is returned when the initial value provided to NewState2Tracker is +// not a valid starting state for the State2 state machine. +var ErrInvalidPipelineState = errors.New("invalid pipeline state") + +// State2 represents the state of the processing pipeline with granular download/indexing phases. +// +// State Transitions (see Pipeline2 spec): +// +// ┏━━━━━━━━━┓ ┏━━━━━━━━━━━━━┓ ┏━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━┓ ┏━━━━━━━━━━┓ +// ┃ Pending ┃───►┃ Downloading ┃───►┃ Indexing ┃───►┃ WaitingPersist ┃───►┃ Persisting ┃───►┃ Complete ┃ +// ┗━━━━┯━━━━┛ ┗━━━━━━┯━━━━━━┛ ┗━━━━━┯━━━━┛ ┗━━━━━━━┯━━━━━━━━┛ ┗━━━━━━━━━━━━┛ ┗━━━━━━━━━━┛ +// │ │ │ │ +// └────────────────┴─────────────────┴─────────────────┴────►┏━━━━━━━━━━━┓ +// ┃ Abandoned ┃ +// ┗━━━━━━━━━━━┛ +// +// Properties of our state transition function: +// - reflexive: transitioning to the same state is allowed (no-op) for *valid* states +type State2 uint32 + +const ( + // State2Pending is the initial state after instantiation, before processing begins. + State2Pending State2 = iota + // State2Downloading represents the state where ExecutionData download is in progress. + State2Downloading + // State2Indexing represents the state where building the index from downloaded data is in progress. + State2Indexing + // State2WaitingPersist represents the state where all data is indexed, but conditions to persist are not met. + State2WaitingPersist + // State2Persisting represents the state where the conditions to persist are met, but the persisting process hasn't finished. + State2Persisting + // State2Complete represents the terminal state where all data is persisted to storage. + State2Complete + // State2Abandoned represents the terminal state where processing was aborted. + State2Abandoned +) + +// String returns the string representation of State2. +func (s State2) String() string { + switch s { + case State2Pending: + return "pending" + case State2Downloading: + return "downloading" + case State2Indexing: + return "indexing" + case State2WaitingPersist: + return "waiting2persist" + case State2Persisting: + return "persisting" + case State2Complete: + return "complete" + case State2Abandoned: + return "abandoned" + default: + return "unknown" + } +} + +// IsTerminal returns true if the state is a terminal state (Complete or Abandoned). +func (s State2) IsTerminal() bool { + return s == State2Complete || s == State2Abandoned +} + +// IsValid returns true if the state is a valid State2 value. +func (s State2) IsValid() bool { + switch s { + case State2Pending, State2Downloading, State2Indexing, State2WaitingPersist, State2Persisting, State2Complete, State2Abandoned: + return true + default: + return false + } +} + +// CanReach returns true if and only if there exists a _sequence_ of valid state transitions (zero or more) +// that reaches the `to` state. Formally, this is the reflexive transitive closure of the state transition +// function [State2.IsValidTransition]. +// ATTENTION: this function considers MULTI-STEP state transitions (zero or more). +// To query the single-step state transition function, use function [State2.IsValidTransition]. +func (s State2) CanReach(to State2) bool { + if to == s && to.IsValid() { // our state transition is defined to be reflexive: a valid state can always transition to itself + return true + } + // State2Complete and State2Abandoned are terminal states, only allowing self-transitions, which are covered + // above. Hence, in the following, we only need to handle `State2Pending` up to `State2Persisting`. + + switch s { + case State2Pending: + return to == State2Downloading || to == State2Indexing || to == State2WaitingPersist || to == State2Persisting || to == State2Complete || to == State2Abandoned + case State2Downloading: + return to == State2Indexing || to == State2WaitingPersist || to == State2Persisting || to == State2Complete || to == State2Abandoned + case State2Indexing: + return to == State2WaitingPersist || to == State2Persisting || to == State2Complete || to == State2Abandoned + case State2WaitingPersist: + return to == State2Persisting || to == State2Complete || to == State2Abandoned + case State2Persisting: + return to == State2Complete + default: + return false + } +} + +// IsValidTransition returns true if transitioning from this state to the given state is valid. +// Transitioning to the same state is considered valid (no-op), if and only of the state is valid. +// ATTENTION: this function represents exclusively SINGLE-STEP state transitions. +// For multi-step reachability, use function [State2.CanReach]. +func (s State2) IsValidTransition(to State2) bool { + if to == s && to.IsValid() { // our state transition is defined to be reflexive: a valid state can always transition to itself + return true + } + // State2Complete and State2Abandoned are terminal states, only allowing self-transitions, which are covered + // above. Hence, in the following, we only need to handle `State2Pending` up to `State2WaitingPersist`. + + switch s { + case State2Pending: + return to == State2Downloading || to == State2Abandoned + case State2Downloading: + return to == State2Indexing || to == State2Abandoned + case State2Indexing: + return to == State2WaitingPersist || to == State2Abandoned + case State2WaitingPersist: + return to == State2Persisting || to == State2Abandoned + case State2Persisting: + return to == State2Complete + default: + return false + } +} + +// IsValidInitialState returns true if the state is a valid initial state for State2Tracker. +// We only allow starting from State2Pending since the pipeline +// always starts from the beginning. +func (s State2) IsValidInitialState() bool { + return s == State2Pending +} + +// State2Tracker is a concurrency-safe tracker for State2 using atomic operations. +// It enforces valid state transitions as defined by [State2.IsValidTransition]. +// Concurrent write-read establishes a 'happens before' relation as detailed in https://go.dev/ref/mem +type State2Tracker struct { + // The constructor follows the prevalent pattern of returning a reference type. + // We embed the atomic Uint32 directly to avoid extra pointer dereferences for performance reasons. + state2 atomic.Uint32 +} + +// NewState2Tracker instantiates a concurrency-safe state machine with valid state transitions +// as specified in State2.IsValidTransition. The intended use is to track the processing state +// of an ExecutionResult in the Pipeline2. +// +// TODO: at the moment, the input is unnecessary, because only the initial state `State2Pending` is accepted +// +// Expected error returns during normal operations: +// - [ErrInvalidPipelineState]: if the initial value is not a valid starting state +func NewState2Tracker(initialState State2) (*State2Tracker, error) { + if !initialState.IsValidInitialState() { + return nil, fmt.Errorf("state '%s' is not a valid starting state: %w", initialState.String(), ErrInvalidPipelineState) + } + return &State2Tracker{ + state2: *(atomic.NewUint32(uint32(initialState))), + }, nil +} + +// Set attempts to transition the state to the new state. +// The transition succeeds if and only if it is valid as defined by [State2.IsValidTransition]. +// No matter whether the state transition succeeds (second return value), the `oldState` return +// value is always the state from which the transition was attempted. +// ATTENTION: this function performs a SINGLE-STEP state transition. +// For multi-step state evolution, use function [State2.Evolve]. +func (t *State2Tracker) Set(newState State2) (oldState State2, success bool) { + for { + oldState = t.Value() + if !oldState.IsValidTransition(newState) { + return oldState, false + } + if t.state2.CompareAndSwap(uint32(oldState), uint32(newState)) { + return oldState, true + } + } +} + +// Evolve attempts a _sequence_ of valid state transitions (zero or more) to reach the specified `target` +// from the current state. The entire sequence is performed as a single ATOMIC operation. The state +// evolution succeeds if and only if it is valid as defined by [State2.CanReach]. No matter whether the +// state evolution succeeds (second return value), the `oldState` return value is always the state from +// which the evolution was attempted. +// ATTENTION: this function performs MULTI-STEP state transitions (zero or more). +// To perform a single-step state transition, use function [State2.Set]. +func (t *State2Tracker) Evolve(target State2) (oldState State2, success bool) { + for { + oldState = t.Value() + if !oldState.CanReach(target) { + return oldState, false + } + if t.state2.CompareAndSwap(uint32(oldState), uint32(target)) { + return oldState, true + } + } +} + +// CompareAndSwap attempts an ATOMIC transition from the anticipated old state to `newState`. If the current +// state is different from `anticipatedOldState`, the transition fails. For matching current state, the +// transition succeeds if and only if it is valid as defined by [State2.IsValidTransition]. No matter +// whether the state transition succeeds (second return value), the `oldState` return value is always +// the state from which the transition was attempted (not necessarily equal to input `anticipatedOldState`). +// ATTENTION: this function performs a SINGLE-STEP state transition. (Multi-step state evolution is straight +// forward, but not yet needed for the use-case; hence not implemented). +func (t *State2Tracker) CompareAndSwap(anticipatedOldState, newState State2) (oldState State2, success bool) { + for { + oldState = t.Value() + if oldState != anticipatedOldState { + return oldState, false + } + if !oldState.IsValidTransition(newState) { + return oldState, false + } + if t.state2.CompareAndSwap(uint32(oldState), uint32(newState)) { + return oldState, true + } + } +} + +// Value returns the current state. +func (t *State2Tracker) Value() State2 { + return State2(t.state2.Load()) +} diff --git a/module/executiondatasync/optimistic_sync/state_test.go b/module/executiondatasync/optimistic_sync/state_test.go new file mode 100644 index 00000000000..b39789a0ad7 --- /dev/null +++ b/module/executiondatasync/optimistic_sync/state_test.go @@ -0,0 +1,541 @@ +package optimistic_sync + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var allValidState2Values = []State2{ + State2Pending, // 0 + State2Downloading, // 1 + State2Indexing, // 2 + State2WaitingPersist, // 3 + State2Persisting, // 4 + State2Complete, // 5 + State2Abandoned, // 6 +} + +// allState2Values includes all valid State2 values plus some invalid ones +// permitted by the underlying uint32 type. +var allState2Values = append(allValidState2Values, + // invalid (but permitted by the underlying uint32 type): + State2(State2Abandoned+1), + State2(999), +) + +// TestState2_IsValid tests the IsValid method for State2 +func TestState2_IsValid(t *testing.T) { + // Valid states + assert.True(t, State2Pending.IsValid()) + assert.True(t, State2Downloading.IsValid()) + assert.True(t, State2Indexing.IsValid()) + assert.True(t, State2WaitingPersist.IsValid()) + assert.True(t, State2Persisting.IsValid()) + assert.True(t, State2Complete.IsValid()) + assert.True(t, State2Abandoned.IsValid()) + + // invalid states + assert.False(t, State2(State2Abandoned+1).IsValid()) + assert.False(t, State2(999).IsValid()) +} + +// TestState2_String tests the String method for State2 +func TestState2_String(t *testing.T) { + assert.Equal(t, "pending", State2Pending.String()) + assert.Equal(t, "downloading", State2Downloading.String()) + assert.Equal(t, "indexing", State2Indexing.String()) + assert.Equal(t, "waiting2persist", State2WaitingPersist.String()) + assert.Equal(t, "persisting", State2Persisting.String()) + assert.Equal(t, "complete", State2Complete.String()) + assert.Equal(t, "abandoned", State2Abandoned.String()) + + assert.Equal(t, "unknown", State2(State2Abandoned+1).String()) + assert.Equal(t, "unknown", State2(999).String()) +} + +// TestState2_IsTerminal tests the IsTerminal method for State2 +func TestState2_IsTerminal(t *testing.T) { + // Valid states that are not terminal states + assert.False(t, State2Pending.IsTerminal()) + assert.False(t, State2Downloading.IsTerminal()) + assert.False(t, State2Indexing.IsTerminal()) + assert.False(t, State2WaitingPersist.IsTerminal()) + assert.False(t, State2Persisting.IsTerminal()) + + // Valid terminal states + assert.True(t, State2Complete.IsTerminal()) + assert.True(t, State2Abandoned.IsTerminal()) + + // Also invalid states should not be considered terminal + assert.False(t, State2(State2Abandoned+1).IsTerminal()) + assert.False(t, State2(999).IsTerminal()) +} + +// TestState2_IsValidTransition tests the [State2.IsValidTransition] method. +// Valid transitions are defined as follows: +// +// ┏━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━┓ ┏━━━━━━━━━━━┓ +// ┃ Pending ┃───►┃ Downloading ┃───►┃ Indexing ┃───►┃ WaitingPersist ┃───►┃ Persisting ┃───►┃ Complete ┃ +// ┗━━━━━┯━━━━━┛ ┗━━━━━━┯━━━━━━━┛ ┗━━━━━┯━━━━━┛ ┗━━━━━━━┯━━━━━━━━┛ ┗━━━━━━━━━━━━┛ ┗━━━━━━━━━━━┛ +// │ │ │ │ ┏━━━━━━━━━━━┓ +// └─────────────────┴──────────────────┴──────────────────┴──────►┃ Abandoned ┃ +// ┗━━━━━━━━━━━┛ +// +// NOTE: Persisting can ONLY transition to Complete (not Abandoned). Once we start persisting, +// we cannot abort the operation. This is an intentional design choice. +// +// Within this test, we also verify that: +// * Our state transition function is reflexive, i.e. self-transitions from the state to itself are allowed (no-op) for valid states +// * Backward transitions are not allowed (e.g., Indexing -> Downloading). +// * Skipping states is not allowed (e.g., Pending -> Indexing). +// * Terminal states (Complete and Abandoned) cannot transition to any other state. +func TestState2_IsValidTransition(t *testing.T) { + + t.Run("state transitions from `State2Pending`", func(t *testing.T) { + from := State2Pending + // Pending can transition to: itself (reflexive), Downloading (happy path), Abandoned (abort) + for _, toValid := range []State2{State2Pending, State2Downloading, State2Abandoned} { + assert.True(t, from.IsValidTransition(toValid), "%s -> %s should be valid", from, toValid) + } + for _, toInvalid := range []State2{State2Indexing, State2WaitingPersist, State2Persisting, State2Complete, State2Abandoned + 1, 999} { + assert.False(t, from.IsValidTransition(toInvalid), "%s -> %s should be invalid", from, toInvalid) + } + }) + + t.Run("state transitions from `State2Downloading`", func(t *testing.T) { + from := State2Downloading + // Downloading can transition to: itself (reflexive), Indexing (happy path), Abandoned (abort) + for _, toValid := range []State2{State2Downloading, State2Indexing, State2Abandoned} { + assert.True(t, from.IsValidTransition(toValid), "%s -> %s should be valid", from, toValid) + } + for _, toInvalid := range []State2{State2Pending, State2WaitingPersist, State2Persisting, State2Complete, State2Abandoned + 1, 999} { + assert.False(t, from.IsValidTransition(toInvalid), "%s -> %s should be invalid", from, toInvalid) + } + }) + + t.Run("state transitions from `State2Indexing`", func(t *testing.T) { + from := State2Indexing + // State2Indexing can transition to: itself (reflexive), WaitingPersist (happy path), Abandoned (abort) + for _, toValid := range []State2{State2Indexing, State2WaitingPersist, State2Abandoned} { + assert.True(t, from.IsValidTransition(toValid), "%s -> %s should be valid", from, toValid) + } + for _, toInvalid := range []State2{State2Pending, State2Downloading, State2Persisting, State2Complete, State2Abandoned + 1, 999} { + assert.False(t, from.IsValidTransition(toInvalid), "%s -> %s should be invalid", from, toInvalid) + } + }) + + t.Run("state transitions from `State2WaitingPersist`", func(t *testing.T) { + from := State2WaitingPersist + // WaitingPersist can transition to: itself (reflexive), Persisting (happy path), Abandoned (abort) + for _, toValid := range []State2{State2WaitingPersist, State2Persisting, State2Abandoned} { + assert.True(t, from.IsValidTransition(toValid), "%s -> %s should be valid", from, toValid) + } + for _, toInvalid := range []State2{State2Pending, State2Downloading, State2Indexing, State2Complete, State2Abandoned + 1, 999} { + assert.False(t, from.IsValidTransition(toInvalid), "%s -> %s should be invalid", from, toInvalid) + } + }) + + t.Run("state transitions from `State2Persisting`", func(t *testing.T) { + from := State2Persisting + // Persisting can ONLY transition to: itself (reflexive) and Complete (happy path) + // NOTE: Persisting CANNOT transition to Abandoned - this is intentional. + // Once we start the database write, we have committed to the specific result we are persisting. + for _, toValid := range []State2{State2Persisting, State2Complete} { + assert.True(t, from.IsValidTransition(toValid), "%s -> %s should be valid", from, toValid) + } + for _, toInvalid := range []State2{State2Pending, State2Downloading, State2Indexing, State2WaitingPersist, State2Abandoned, State2Abandoned + 1, 999} { + assert.False(t, from.IsValidTransition(toInvalid), "%s -> %s should be invalid", from, toInvalid) + } + }) + + t.Run("states `Complete` and `Abandoned` are terminal", func(t *testing.T) { + // terminal states only allow self-transitions + for _, to := range allState2Values { + assert.Equal(t, State2Complete == to, State2Complete.IsValidTransition(to), "validity status of %s -> %s does not match expectation", State2Complete, to) + assert.Equal(t, State2Abandoned == to, State2Abandoned.IsValidTransition(to), "validity status of %s -> %s does not match expectation", State2Abandoned, to) + } + }) + + t.Run("states transitions involving invalid states should always be considered invalid", func(t *testing.T) { + for _, invalid := range []State2{State2Abandoned + 1, 999} { + for _, s := range allState2Values { + assert.False(t, s.IsValidTransition(invalid), "%s -> %s should be invalid (target is invalid state)", s, invalid) + assert.False(t, invalid.IsValidTransition(s), "%s -> %s should be invalid (source is invalid state)", invalid, s) + } + } + }) +} + +// TestState2_CanReach tests [State2.CanReach] method. +// For a state machine, "reachability" is defined as the reflexive transitive closure of the state +// transition function, whose correctness we have verified in the previous test [State2.IsValidTransition]. +// +// Within this test, we also verify that: +// * Our state transition function is reflexive, i.e. self-transitions from the state to itself are allowed (no-op) for valid states +// * Backward transitions are not allowed (e.g., Indexing -> Downloading). +// * Terminal states (Complete and Abandoned) cannot transition to any other state. +// * Any pairs including invalid states are not reachable (including self-reachability). +func TestState2_CanReach(t *testing.T) { + // We fix a `destination` state and ask: From which `start` states is this `destination` state reachable? For each destination + // state, we store all possible start state we have found so far in the map `r: destination -> Set of known start states`. + r := constructReachability() + + t.Run("exhaustively iterate over all pairs of states and check `CanReach` function", func(t *testing.T) { + for _, start := range allState2Values { + for _, dest := range allState2Values { + if !start.IsValid() || !dest.IsValid() { // if either a or b are invalid states, reachability must be false + assert.False(t, start.CanReach(dest), "starting from state '%s', state '%s' should not be reachable, as one or both states are invalid", start, dest) + } else { // both `start` and `dest` are valid states + _, expectedReachable := r[dest][start] // is `b` in the set of known start states from which `a` is reachable? + assert.Equal(t, expectedReachable, start.CanReach(dest), "starting from state '%s', reachability of state '%s' is incorrect", start, dest) + } + } + } + }) +} + +// TestState2_IsValid_isValidInitialState verifies that the internal method +// `isValidInitialState` only considers State2Pending as valid initial state. +func TestState2_IsValid_isValidInitialState(t *testing.T) { + for _, initState := range allState2Values { + assert.Equal(t, initState == State2Pending, initState.isValidInitialState(), "%s should not be considered a valid initial state", initState) + } +} + +// TestNewState2Tracker_RejectsInvalidStartingStates verifies that the State Tracker +// can only be initialized with State2Pending. +func TestState2Tracker_RejectsInvalidStartingStates(t *testing.T) { + t.Run("Happy path: State Tracker initialized in state pending", func(t *testing.T) { + tracker, err := NewState2Tracker(State2Pending) + require.NoError(t, err) + require.NotNil(t, tracker) + assert.Equal(t, State2Pending, tracker.Value()) + }) + + t.Run("Invalid states for initializing State Tracker", func(t *testing.T) { + for _, initState := range allState2Values { + if initState == State2Pending { // single state that should be permitted as starting state + continue + } + tracker, err := NewState2Tracker(initState) + assert.ErrorIs(t, err, ErrInvalidPipelineState) + assert.Nil(t, tracker) + } + }) +} + +// initializeAndTransitionTo_HappyPath is a utility method. It initializes a State2Tracker +// and transitions it to the specified target state along the HAPPY PATH: +// +// Pending → Downloading → Indexing → WaitingPersist → Persisting → Complete +// +// For Abandoned state, the shortest route (Pending → Abandoned) is used. +func initializeAndTransitionTo_HappyPath(t *testing.T, target State2) *State2Tracker { + // recursively initialize and forward + var prior State2 + switch target { + case State2Pending: // base case + tracker, err := NewState2Tracker(State2Pending) + require.NoError(t, err) + assert.Equal(t, State2Pending, tracker.Value()) + return tracker + case State2Downloading: // Pending -> Downloading + prior = State2Pending + case State2Indexing: // Downloading -> Indexing + prior = State2Downloading + case State2WaitingPersist: // Indexing -> WaitingPersist + prior = State2Indexing + case State2Persisting: // WaitingPersist -> Persisting + prior = State2WaitingPersist + case State2Complete: // Persisting -> Complete + prior = State2Persisting + case State2Abandoned: // can be reached from any state except `Complete` and `Persisting`; we choose the shortest route + prior = State2Pending + default: + t.Fatalf("invalid target state %s", target) + } + + tracker := initializeAndTransitionTo_HappyPath(t, prior) + assert.Equal(t, prior, tracker.Value()) + oldState, success := tracker.Set(target) + require.True(t, success, "unexpected failure of state transition %s to %s", prior, target) + require.Equal(t, prior, oldState, "unexpected behaviour: prior state should be %s but got %s", prior, oldState) + newState := tracker.Value() + assert.Equal(t, target, newState, "unexpected behaviour: tracker should be in state %s but got %s", target, newState) + return tracker +} + +// TestState2Tracker_HappyPath tests the complete happy path through all states +func TestState2Tracker_HappyPath(t *testing.T) { + for _, target := range allValidState2Values { + initializeAndTransitionTo_HappyPath(t, target) + } +} + +// TestState2Tracker_Set exhaustively checks all possible combinations of state transitions +// (valid and invalid) via the [State2Tracker.Set] method. +// +// Within these tests, we also verify that: +// - Our state transition function is reflexive, i.e. self-transitions from the state to itself are allowed (no-op) for valid states. +// - Backward transitions are not allowed (e.g., Indexing -> Downloading). +// - Skipping states is not allowed (e.g., Pending -> Indexing). +// - Terminal states (Complete and Abandoned) cannot transition to any other state. +// - Transitioning to invalid states is not allowed. (we can't test transitions from invalid states, since the +// State2Tracker can only be initialized in valid states and only allows valid state transitions). +func TestState2Tracker_Set(t *testing.T) { + // verifyStateTransition is a helper to verify a single state transition via [State2Tracker.Set] method: + // it supports both expected success and expected failure cases. + verifyStateTransition := func(t *testing.T, tracker *State2Tracker, expectedOldState State2, to State2, expectedSuccess bool) { + oldState, success := tracker.Set(to) + newState := tracker.Value() + assert.Equal(t, expectedOldState, oldState, "Expected the old state to be reported as %s but got %s", expectedOldState, oldState) + + if expectedSuccess { + t.Run(fmt.Sprintf("verifying %s -> %s succeeds", expectedOldState, to), func(t *testing.T) { + assert.True(t, success, "Expected state transition to be successful") + assert.Equal(t, to, newState, "Expected the state to be updated to %s but got %s", to, newState) + }) + } else { + t.Run(fmt.Sprintf("verifying %s -> %s fails", expectedOldState, to), func(t *testing.T) { + assert.False(t, success, "Expected state transition to fail") + // since state transition failed, state should remain unchanged + assert.Equal(t, expectedOldState, newState, "Expected the state to remain at %s but got %s", expectedOldState, newState) + }) + } + } + + t.Run("transitions from Pending", func(t *testing.T) { + for _, to := range allState2Values { + tracker := initializeAndTransitionTo_HappyPath(t, State2Pending) // correctness verified in prior test `TestState2Tracker_HappyPath` + expectedValidity := State2Pending.IsValidTransition(to) // assumed to be correct, as tested above + verifyStateTransition(t, tracker, State2Pending, to, expectedValidity) + } + }) + + // We can only start from Pending, so we need to manually transition to other states + // to test transitions from those states. + + t.Run("transitions from Downloading", func(t *testing.T) { + for _, to := range allState2Values { + tracker := initializeAndTransitionTo_HappyPath(t, State2Downloading) // correctness verified in prior test `TestState2Tracker_HappyPath` + expectedValidity := State2Downloading.IsValidTransition(to) // assumed to be correct, as tested above + verifyStateTransition(t, tracker, State2Downloading, to, expectedValidity) + } + }) + + t.Run("transitions from Indexing", func(t *testing.T) { + for _, to := range allState2Values { + tracker := initializeAndTransitionTo_HappyPath(t, State2Indexing) // correctness verified in prior test `TestState2Tracker_HappyPath` + expectedValidity := State2Indexing.IsValidTransition(to) // assumed to be correct, as tested above + verifyStateTransition(t, tracker, State2Indexing, to, expectedValidity) + } + }) + + t.Run("transitions from WaitingPersist", func(t *testing.T) { + for _, to := range allState2Values { + tracker := initializeAndTransitionTo_HappyPath(t, State2WaitingPersist) // correctness verified in prior test `TestState2Tracker_HappyPath` + expectedValidity := State2WaitingPersist.IsValidTransition(to) // assumed to be correct, as tested above + verifyStateTransition(t, tracker, State2WaitingPersist, to, expectedValidity) + } + }) + + t.Run("transitions from Persisting", func(t *testing.T) { + for _, to := range allState2Values { + tracker := initializeAndTransitionTo_HappyPath(t, State2Persisting) // correctness verified in prior test `TestState2Tracker_HappyPath` + expectedValidity := State2Persisting.IsValidTransition(to) // assumed to be correct, as tested above + verifyStateTransition(t, tracker, State2Persisting, to, expectedValidity) + } + }) + + t.Run("transitions from Complete (terminal)", func(t *testing.T) { + for _, to := range allState2Values { + tracker := initializeAndTransitionTo_HappyPath(t, State2Complete) // correctness verified in prior test `TestState2Tracker_HappyPath` + expectedValidity := to == State2Complete // Complete is terminal, so all transitions except self-transition should fail + verifyStateTransition(t, tracker, State2Complete, to, expectedValidity) + } + }) + + t.Run("transitions from Abandoned (terminal)", func(t *testing.T) { + for _, to := range allState2Values { + tracker := initializeAndTransitionTo_HappyPath(t, State2Abandoned) // correctness verified in prior test `TestState2Tracker_HappyPath` + expectedValidity := to == State2Abandoned // Abandoned is terminal, so all transitions except self-transition should fail + verifyStateTransition(t, tracker, State2Abandoned, to, expectedValidity) + } + }) +} + +// TestState2Tracker_Evolve exhaustively checks all possible combinations of `start` and `destination` states +// (valid and invalid) and attempts to evolve the state via the [State2Tracker.Evolve] method. +// +// Within these tests, we also verify that: +// - Our state transition function is reflexive, i.e. self-transitions from the state to itself are allowed (no-op) for valid states. +// - Backward transitions are not allowed (e.g., Indexing -> Downloading). +// - Skipping states is not allowed (e.g., Pending -> Indexing). +// - Terminal states (Complete and Abandoned) cannot transition to any other state. +// - Transitioning to invalid states is not allowed. (we can't test transitions from invalid states, since the +// State2Tracker can only be initialized in valid states and only allows valid state transitions). +func TestState2Tracker_Evolve(t *testing.T) { + // verifyStateEvolution is a helper to verify an atomic state evolution executed via [State2Tracker.Evolve] method. + // It supports both expected success and expected failure cases. Process: + // 1. A new tracker is initialized and transitioned to `currentTrackerState` along the happy path (utilizes functionality, whose + // correctness we have confirmed in prior tests). This generates the starting conditions for the `Evolve` operation we are testing. + // 2. The `Evolve` operation is performed, requesting a state evolution to the `target` state. + // 3. We compare the reported old state, success return as well as the resulting state against the + // reference implementation of "reachability" defined by the [State2.CanReach] method (whose correctness we have confirmed in + // prior tests) + // Notes: + // - `currentTrackerState` should be set to *valid* states only, since the State tracker should prevent reaching invalid states. + verifyStateEvolution := func(t *testing.T, currentTrackerState Start, requestedEvolutionTarget Destination) { + tracker := initializeAndTransitionTo_HappyPath(t, currentTrackerState) // only succeeds for valid states + require.Equal(t, currentTrackerState, tracker.Value()) // sanity check + + // evolve state via method [State2Tracker.Evolve] + oldState, success := tracker.Evolve(requestedEvolutionTarget) + newState := tracker.Value() + assert.Equal(t, currentTrackerState, oldState, "Expected the old state to be reported as %s but got %s", currentTrackerState, oldState) + expectedSuccess := currentTrackerState.CanReach(requestedEvolutionTarget) + + if expectedSuccess { + t.Run(fmt.Sprintf("verifying evolution from '%s' to '%s' succeeds", currentTrackerState, requestedEvolutionTarget), func(t *testing.T) { + assert.True(t, success, "Expected state evolution to be successful") + assert.Equal(t, requestedEvolutionTarget, newState, "Expected the state to be evolved to '%s' but got '%s'", requestedEvolutionTarget, newState) + }) + } else { + t.Run(fmt.Sprintf("verifying evolution from '%s' to '%s' fails", currentTrackerState, requestedEvolutionTarget), func(t *testing.T) { + assert.False(t, success, "Expected state transition to fail") + // since state evolution failed, state should remain unchanged + assert.Equal(t, currentTrackerState, newState, "Expected the state to remain at '%s' but got '%s'", currentTrackerState, newState) + }) + } + } + + for _, currentTrackerState := range allValidState2Values { + t.Run(fmt.Sprintf("starting from tracker state '%s', exhaustively checking all target states", currentTrackerState), func(t *testing.T) { + for _, requestedEvolutionTarget := range allState2Values { + verifyStateEvolution(t, currentTrackerState, requestedEvolutionTarget) + } + }) + } +} + +// TestState2Tracker_Set exhaustively checks all possible combinations of state transitions +// (valid and invalid) via the [State2Tracker.CompareAndSwap] method. +// +// Within these tests, we also verify that: +// * Our state transition function is reflexive, i.e. self-transitions from the state to itself are allowed (no-op) for valid states +// * Backward transitions are not allowed (e.g., Indexing -> Downloading). +// * Skipping states is not allowed (e.g., Pending -> Indexing). +// * Terminal states (Complete and Abandoned) cannot transition to any other state. +// * Transitioning to invalid states is not allowed. +func TestState2Tracker_CompareAndSwap(t *testing.T) { + type casRequest struct{ from, to State2 } // syntactic sugar for better readability of the tests below + + // verifyStateTransition is a helper to verify a atomic state transition via [State2Tracker.CompareAndSwap] method. + // It supports both expected success and expected failure cases. Process: + // 1. A new tracker is initialized and transitioned to `currentTrackerState` along the happy path (utilizes functionality, + // whose correctness we have confirmed in prior tests). This generates the starting conditions for the CAS operation. + // 2. The CAS operation is performed, requesting a state transition `casRequest.from` -> `casRequest.to`. Note that `casRequest.from` + // might be different from `currentTrackerState`. Thereby, we can test both valid and invalid requests for CAS operations. + // 3. The criteria for success/failure of the CAS operation are: + // (i) the trackers `currentTrackerState` must be equal to the `casRequest.from` state requested by the CAS operation, and + // (ii) the transition `casRequest.from` -> `casRequest.to` must be valid according to [State2.IsValidTransition]. + // In this test, we assume that [State2.IsValidTransition] is correct, which is verified in a prior test. + // Notes: + // - `currentTrackerState` should be set to *valid* states only, since the State tracker should prevent reaching invalid states. + verifyStateTransition := func(t *testing.T, currentTrackerState State2, casRequest casRequest) { + tracker := initializeAndTransitionTo_HappyPath(t, currentTrackerState) // only succeeds for valid starting states + require.Equal(t, currentTrackerState, tracker.Value()) // sanity check + + // perform CAS operation + oldState, success := tracker.CompareAndSwap(casRequest.from, casRequest.to) + newState := tracker.Value() + assert.Equal(t, currentTrackerState, oldState, "Expected the old state to be reported as %s but got %s", casRequest.from, oldState) + + // check CAS result + expectedSuccess := (currentTrackerState == casRequest.from) && casRequest.from.IsValidTransition(casRequest.to) // criteria (i) and (ii) determining expected success + if expectedSuccess { + t.Run(fmt.Sprintf("verifying that on state '%s', CAS '%s' -> '%s' succeeds", currentTrackerState, casRequest.from, casRequest.to), func(t *testing.T) { + assert.True(t, success, "Expected state transition to be successful") + assert.Equal(t, casRequest.to, newState, "Expected the state to be updated to '%s' but got '%s'", casRequest.to, newState) + }) + } else { + t.Run(fmt.Sprintf("verifying that on state '%s', CAS '%s' -> '%s' fails", currentTrackerState, casRequest.from, casRequest.to), func(t *testing.T) { + assert.False(t, success, "Expected state transition to fail") + // since state transition failed, state should remain unchanged + assert.Equal(t, currentTrackerState, newState, "Expected the state to remain at '%s' but got '%s'", casRequest.from, newState) + }) + } + } + + for _, currentTrackerState := range allValidState2Values { + t.Run(fmt.Sprintf("starting from tracker state '%s', exhaustively checking all pairs of CompareAndSwap requests", currentTrackerState), func(t *testing.T) { + for _, casFrom := range allState2Values { + for _, casTo := range allState2Values { + verifyStateTransition(t, currentTrackerState, casRequest{from: casFrom, to: casTo}) + } + } + }) + } +} + +/* ********************************* Test Helpers ********************************* */ + +type Destination = State2 // syntactic sugar for better readability +type Start = State2 // syntactic sugar for better readability + +type ReachabilityReferenceImplementation map[Destination]map[Start]struct{} + +// constructReachability evaluates expected reachability between pairs of states. +// While reachability in the production implementation is hard-coded for efficiency, this implementation here is for +// testing purposes only following an orthogonal: In a nutshell, we utilize that reachability is the reflexive transitive +// closure of the state transition function, whose correctness we have verified in the test [State2.IsValidTransition]. +func constructReachability() ReachabilityReferenceImplementation { + // We fix a `destination` state and ask: From which `start` states is this `destination` state reachable? For + // each destination state, we store a tentative notion of all possible start state we have found so far in the + // map `r: destination -> Set of known start states`. + + // Step 1: We begin with the start states that can reach the destination in one step (i.e. valid transitions). + r := make(ReachabilityReferenceImplementation) // mapping destination -> Set of known start states from which `destination` is reachable + for _, start := range allState2Values { + for _, destination := range allState2Values { + if start.IsValidTransition(destination) { + starts := r[destination] // might return zero value, i.e. nil slice (works with all operations below) + if starts == nil { + starts = make(map[Start]struct{}) + } + starts[start] = struct{}{} + r[destination] = starts + } + } + } + + // Step 2: we iteratively expand the set of known start states by adding all states that can reach any of the + // already known start states in one step. This process is repeated until no new start states were found. + for { + foundNewStartingStates := false + // We know that we have each possible destination state already in the map `r` from Step 1, because each state can at least + // reach itself (reflexivity). Furthermore, for each destination state, the state itself is in the set of known start state. + // Hence, we neither have to worry about missing destination states in `r`, nor about the set of known start states being nil. + for _, starts := range r { + for start := range starts { // iterates over all known start states, from which `destination` can be reached + // Check _all_ possible states `s`, we check if the transition `s` -> `start` is valid. If + // yes, then `s` can also reach `destination` (via start), so we add `s` to the set `r[destination]`. + for _, s := range allState2Values { + _, isKnownStartingState := starts[s] + if isKnownStartingState || !s.IsValidTransition(start) { // if s is already a known starting state or s -> start is not a valid transition, skip + continue + } + starts[s] = struct{}{} + foundNewStartingStates = true + } + } + } + if !foundNewStartingStates { + // no new start states were found: we have constructed the transitive closure of the state transition function (definition of reachability) + return r + } + } +}