Skip to content

Commit

Permalink
Merge pull request onflow#5444 from onflow/petera/backport-fix-event-…
Browse files Browse the repository at this point in the history
…tx-order

[Access] Sort events returned from index in tx index order - backport
  • Loading branch information
peterargue authored Feb 23, 2024
2 parents e40084b + 8a6bbaf commit e1e2ddd
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 4 deletions.
13 changes: 12 additions & 1 deletion engine/access/rpc/backend/backend_events_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package backend

import (
"bytes"
"context"
"fmt"
"sort"
"testing"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -115,10 +117,19 @@ func (s *BackendEventsSuite) SetupTest() {
s.blockEvents = generator.GetEventsWithEncoding(10, entities.EventEncodingVersion_CCF_V0)
targetEvent = string(s.blockEvents[0].Type)

// events returned from the db are sorted by txID, txIndex, then eventIndex.
// reproduce that here to ensure output order works as expected
returnBlockEvents := make([]flow.Event, len(s.blockEvents))
copy(returnBlockEvents, s.blockEvents)

sort.Slice(returnBlockEvents, func(i, j int) bool {
return bytes.Compare(returnBlockEvents[i].TransactionID[:], returnBlockEvents[j].TransactionID[:]) < 0
})

s.events.On("ByBlockID", mock.Anything).Return(func(blockID flow.Identifier) ([]flow.Event, error) {
for _, headerID := range s.blockIDs {
if blockID == headerID {
return s.blockEvents, nil
return returnBlockEvents, nil
}
}
return nil, storage.ErrNotFound
Expand Down
82 changes: 82 additions & 0 deletions engine/access/rpc/backend/event_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package backend

import (
"bytes"
"math"
"sort"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/flow"
storagemock "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/utils/unittest"
)

// TestGetEvents tests that GetEvents returns the events in the correct order
func TestGetEvents(t *testing.T) {
expectedEvents := make(flow.EventsList, 0, 6)
expectedEvents = append(expectedEvents, generateTxEvents(unittest.IdentifierFixture(), 0, 1)...)
expectedEvents = append(expectedEvents, generateTxEvents(unittest.IdentifierFixture(), 1, 3)...)
expectedEvents = append(expectedEvents, generateTxEvents(unittest.IdentifierFixture(), 2, 2)...)

storedEvents := make([]flow.Event, len(expectedEvents))
copy(storedEvents, expectedEvents)

// sort events in storage order (by tx ID)
sort.Slice(storedEvents, func(i, j int) bool {
cmp := bytes.Compare(storedEvents[i].TransactionID[:], storedEvents[j].TransactionID[:])
if cmp == 0 {
if storedEvents[i].TransactionIndex == storedEvents[j].TransactionIndex {
return storedEvents[i].EventIndex < storedEvents[j].EventIndex
}
return storedEvents[i].TransactionIndex < storedEvents[j].TransactionIndex
}
return cmp < 0
})

events := storagemock.NewEvents(t)
header := unittest.BlockHeaderFixture()

events.On("ByBlockID", mock.Anything).Return(func(blockID flow.Identifier) ([]flow.Event, error) {
return storedEvents, nil
})

eventsIndex := NewEventsIndex(events)
err := eventsIndex.Initialize(&mockIndexReporter{})
require.NoError(t, err)

actualEvents, err := eventsIndex.GetEvents(header.ID(), header.Height)
require.NoError(t, err)

// output events should be in the same order as the expected events
assert.Len(t, actualEvents, len(expectedEvents))
for i, event := range actualEvents {
assert.Equal(t, expectedEvents[i], event)
}
}

func generateTxEvents(txID flow.Identifier, txIndex uint32, count int) flow.EventsList {
events := make(flow.EventsList, count)
for i := 0; i < count; i++ {
events[i] = flow.Event{
Type: unittest.EventTypeFixture(flow.Localnet),
TransactionID: txID,
TransactionIndex: txIndex,
EventIndex: uint32(i),
}
}
return events
}

type mockIndexReporter struct{}

func (r *mockIndexReporter) LowestIndexedHeight() (uint64, error) {
return 0, nil
}

func (r *mockIndexReporter) HighestIndexedHeight() (uint64, error) {
return math.MaxUint64, nil
}
17 changes: 16 additions & 1 deletion engine/access/rpc/backend/events_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"fmt"
"sort"

"go.uber.org/atomic"

Expand Down Expand Up @@ -37,7 +38,21 @@ func (e *EventsIndex) GetEvents(blockID flow.Identifier, height uint64) ([]flow.
return nil, err
}

return e.events.ByBlockID(blockID)
events, err := e.events.ByBlockID(blockID)
if err != nil {
return nil, err
}

// events are keyed/sorted by [blockID, txID, txIndex, eventIndex]
// we need to resort them by tx index then event index so the output is in execution order
sort.Slice(events, func(i, j int) bool {
if events[i].TransactionIndex == events[j].TransactionIndex {
return events[i].EventIndex < events[j].EventIndex
}
return events[i].TransactionIndex < events[j].TransactionIndex
})

return events, nil
}

// LowestIndexedHeight returns the lowest height indexed by the execution state indexer.
Expand Down
26 changes: 25 additions & 1 deletion engine/access/state_stream/backend/backend_events_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package backend

import (
"bytes"
"context"
"fmt"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -42,8 +44,30 @@ func (s *BackendEventsSuite) TestSubscribeEventsFromExecutionData() {
// extracted from local storage
func (s *BackendEventsSuite) TestSubscribeEventsFromLocalStorage() {
s.backend.useIndex = true

// events returned from the db are sorted by txID, txIndex, then eventIndex.
// reproduce that here to ensure output order works as expected
blockEvents := make(map[flow.Identifier][]flow.Event)
for _, b := range s.blocks {
events := make([]flow.Event, len(s.blockEvents[b.ID()]))
for i, event := range s.blockEvents[b.ID()] {
events[i] = event
}
sort.Slice(events, func(i, j int) bool {
cmp := bytes.Compare(events[i].TransactionID[:], events[j].TransactionID[:])
if cmp == 0 {
if events[i].TransactionIndex == events[j].TransactionIndex {
return events[i].EventIndex < events[j].EventIndex
}
return events[i].TransactionIndex < events[j].TransactionIndex
}
return cmp < 0
})
blockEvents[b.ID()] = events
}

s.events.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(
mocks.StorageMapGetter(s.blockEvents),
mocks.StorageMapGetter(blockEvents),
)

reporter := syncmock.NewIndexReporter(s.T())
Expand Down
30 changes: 29 additions & 1 deletion engine/access/state_stream/backend/backend_executiondata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *BackendExecutionDataSuite) SetupTest() {

seal := unittest.BlockSealsFixture(1)[0]
result := unittest.ExecutionResultFixture()
blockEvents := unittest.BlockEventsFixture(block.Header, (i%len(testEventTypes))*3+1, testEventTypes...)
blockEvents := generateMockEvents(block.Header, (i%len(testEventTypes))*3+1)

numChunks := 5
chunkDatas := make([]*execution_data.ChunkExecutionData, 0, numChunks)
Expand Down Expand Up @@ -228,6 +228,34 @@ func (s *BackendExecutionDataSuite) SetupTest() {
require.NoError(s.T(), err)
}

// generateMockEvents generates a set of mock events for a block split into multiple tx with
// appropriate indexes set
func generateMockEvents(header *flow.Header, eventCount int) flow.BlockEvents {
txCount := eventCount / 3

txID := unittest.IdentifierFixture()
txIndex := uint32(0)
eventIndex := uint32(0)

events := make([]flow.Event, eventCount)
for i := 0; i < eventCount; i++ {
if i > 0 && i%txCount == 0 {
txIndex++
txID = unittest.IdentifierFixture()
eventIndex = 0
}

events[i] = unittest.EventFixture(testEventTypes[i%len(testEventTypes)], txIndex, eventIndex, txID, 0)
}

return flow.BlockEvents{
BlockID: header.ID(),
BlockHeight: header.Height,
BlockTimestamp: header.Timestamp,
Events: events,
}
}

func (s *BackendExecutionDataSuite) TestGetExecutionDataByBlockID() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit e1e2ddd

Please sign in to comment.