diff --git a/storage/store/events.go b/storage/store/events.go index a3e6a476ee4..03dbfbb64ab 100644 --- a/storage/store/events.go +++ b/storage/store/events.go @@ -73,6 +73,9 @@ func (e *Events) BatchStore(lctx lockctx.Proof, blockID flow.Identifier, blockEv eventIndex++ } } + if err := validateEventOrder(combinedEvents); err != nil { + return fmt.Errorf("invalid event ordering for block %s: %w", blockID, err) + } storage.OnCommitSucceed(batch, func() { e.cache.Insert(blockID, combinedEvents) @@ -253,6 +256,65 @@ func (e *ServiceEvents) BatchRemoveByBlockID(blockID flow.Identifier, rw storage return e.cache.RemoveTx(rw, blockID) } +// validateEventOrder verifies that a flattened slice of block events is correctly +// ordered and internally consistent before being written to storage. +// +// The following invariants are enforced: +// - TransactionIndex must be monotonically non-decreasing. +// - TransactionIndex is permitted to skip values; transactions that emit no +// events are simply absent from the slice. +// - Within a single transaction, EventIndex must form a contiguous sequence +// starting at 0 (i.e. 0, 1, 2, …). +// - The first event of every new transaction must have EventIndex == 0. +// +// The function runs in O(n) time and performs no allocations. +// +// No error returns are expected during normal operation. +// Returns an error if any invariant is violated, including the offending indices. +func validateEventOrder(events []flow.Event) error { + if len(events) == 0 { + return nil + } + + if events[0].EventIndex != 0 { + return fmt.Errorf("first event must have EventIndex 0, got EventIndex %d at TransactionIndex %d", + events[0].EventIndex, events[0].TransactionIndex) + } + + prevTxIndex := events[0].TransactionIndex + nextEventIndex := uint32(1) + + for i := 1; i < len(events); i++ { + e := events[i] + + switch { + case e.TransactionIndex == prevTxIndex: + // Same transaction: EventIndex must increment by exactly 1. + if e.EventIndex != nextEventIndex { + return fmt.Errorf("event %d: TransactionIndex %d expected EventIndex %d, got %d", + i, e.TransactionIndex, nextEventIndex, e.EventIndex) + } + nextEventIndex++ + + case e.TransactionIndex > prevTxIndex: + // New transaction (skips are allowed): must start at EventIndex 0. + if e.EventIndex != 0 { + return fmt.Errorf("event %d: first event of TransactionIndex %d must have EventIndex 0, got %d", + i, e.TransactionIndex, e.EventIndex) + } + prevTxIndex = e.TransactionIndex + nextEventIndex = 1 + + default: + // TransactionIndex went backwards — never valid. + return fmt.Errorf("event %d: TransactionIndex must be non-decreasing, got %d after %d", + i, e.TransactionIndex, prevTxIndex) + } + } + + return nil +} + // sortEventsExecutionOrder sorts events by [txIndex, eventIndex] (execution order). func sortEventsExecutionOrder(events []flow.Event) { sort.Slice(events, func(i, j int) bool { diff --git a/storage/store/events_test.go b/storage/store/events_test.go index e591b3da883..b7e81f34ac4 100644 --- a/storage/store/events_test.go +++ b/storage/store/events_test.go @@ -34,14 +34,14 @@ func TestEventStoreRetrieve(t *testing.T) { evt1_2 := unittest.EventFixture( unittest.Event.WithEventType(flow.EventAccountCreated), unittest.Event.WithTransactionIndex(1), - unittest.Event.WithEventIndex(1), + unittest.Event.WithEventIndex(0), // first event of tx1 must be 0 unittest.Event.WithTransactionID(tx2ID), ) evt2_1 := unittest.EventFixture( unittest.Event.WithEventType(flow.EventAccountUpdated), unittest.Event.WithTransactionIndex(2), - unittest.Event.WithEventIndex(2), + unittest.Event.WithEventIndex(0), // first event of tx2 must be 0 unittest.Event.WithTransactionID(tx2ID), ) @@ -322,14 +322,14 @@ func TestEventStoreAndRemove(t *testing.T) { evt1_2 := unittest.EventFixture( unittest.Event.WithEventType(flow.EventAccountCreated), unittest.Event.WithTransactionIndex(1), - unittest.Event.WithEventIndex(1), + unittest.Event.WithEventIndex(0), // fixed unittest.Event.WithTransactionID(tx2ID), ) evt2_1 := unittest.EventFixture( unittest.Event.WithEventType(flow.EventAccountUpdated), unittest.Event.WithTransactionIndex(2), - unittest.Event.WithEventIndex(2), + unittest.Event.WithEventIndex(0), // fixed unittest.Event.WithTransactionID(tx2ID), ) @@ -363,3 +363,127 @@ func TestEventStoreAndRemove(t *testing.T) { require.Len(t, event, 0) }) } + +// TestValidateEventOrder verifies that validateEventOrder correctly accepts valid +// event sequences and rejects all invalid orderings. +func TestValidateEventOrder(t *testing.T) { + lockManager := storage.NewTestingLockManager() + + // helper to build a minimal event with just the index fields set + makeEvent := func(txIndex, eventIndex uint32) flow.Event { + return flow.Event{ + TransactionIndex: txIndex, + EventIndex: eventIndex, + } + } + + t.Run("empty slice is valid", func(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + s := store.NewEvents(metrics.NewNoopCollector(), db) + blockID := unittest.IdentifierFixture() + err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return s.BatchStore(lctx, blockID, []flow.EventsList{}, rw) + }) + }) + require.NoError(t, err) + }) + }) + + t.Run("valid single transaction", func(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + s := store.NewEvents(metrics.NewNoopCollector(), db) + blockID := unittest.IdentifierFixture() + events := []flow.EventsList{{makeEvent(0, 0), makeEvent(0, 1), makeEvent(0, 2)}} + err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return s.BatchStore(lctx, blockID, events, rw) + }) + }) + require.NoError(t, err) + }) + }) + + t.Run("valid with skipped transaction indices", func(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + s := store.NewEvents(metrics.NewNoopCollector(), db) + blockID := unittest.IdentifierFixture() + // tx 0 and tx 5 have events; tx 1,2,3,4 emitted nothing — valid skip + events := []flow.EventsList{ + {makeEvent(0, 0), makeEvent(0, 1)}, + {makeEvent(5, 0), makeEvent(5, 1)}, + } + err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return s.BatchStore(lctx, blockID, events, rw) + }) + }) + require.NoError(t, err) + }) + }) + + t.Run("invalid: first event has EventIndex != 0", func(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + s := store.NewEvents(metrics.NewNoopCollector(), db) + blockID := unittest.IdentifierFixture() + events := []flow.EventsList{{makeEvent(0, 1)}} // starts at 1, not 0 + err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return s.BatchStore(lctx, blockID, events, rw) + }) + }) + require.Error(t, err) + }) + }) + + t.Run("invalid: non-contiguous EventIndex within same transaction", func(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + s := store.NewEvents(metrics.NewNoopCollector(), db) + blockID := unittest.IdentifierFixture() + // jumps from EventIndex 0 to 2, skipping 1 + events := []flow.EventsList{{makeEvent(0, 0), makeEvent(0, 2)}} + err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return s.BatchStore(lctx, blockID, events, rw) + }) + }) + require.Error(t, err) + }) + }) + + t.Run("invalid: new transaction starts with EventIndex != 0", func(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + s := store.NewEvents(metrics.NewNoopCollector(), db) + blockID := unittest.IdentifierFixture() + // tx 1 starts at EventIndex 1 instead of 0 + events := []flow.EventsList{ + {makeEvent(0, 0)}, + {makeEvent(1, 1)}, + } + err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return s.BatchStore(lctx, blockID, events, rw) + }) + }) + require.Error(t, err) + }) + }) + + t.Run("invalid: decreasing TransactionIndex", func(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + s := store.NewEvents(metrics.NewNoopCollector(), db) + blockID := unittest.IdentifierFixture() + // tx 3 appears after tx 5 — goes backwards + events := []flow.EventsList{ + {makeEvent(5, 0)}, + {makeEvent(3, 0)}, + } + err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return s.BatchStore(lctx, blockID, events, rw) + }) + }) + require.Error(t, err) + }) + }) +}