forked from onflow/flow-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
integration_test.go
215 lines (195 loc) · 7.51 KB
/
integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package follower
import (
"context"
"sync"
"testing"
"time"
"github.com/dgraph-io/badger/v2"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/mocks"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module/compliance"
moduleconsensus "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
module "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/module/trace"
moduleutil "github.com/onflow/flow-go/module/util"
"github.com/onflow/flow-go/network/mocknetwork"
pbadger "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/events"
"github.com/onflow/flow-go/state/protocol/util"
storageutil "github.com/onflow/flow-go/storage/util"
"github.com/onflow/flow-go/utils/unittest"
)
// TestFollowerHappyPath tests ComplianceEngine integrated with real modules, mocked modules are used only for functionality which is static
// or implemented by our test case. Tests that syncing batches of blocks from other participants results in extending protocol state.
// After processing all available blocks we check if chain has correct height and finalized block.
// We use the following setup:
// Number of workers - workers
// Number of batches submitted by worker - batchesPerWorker
// Number of blocks in each batch submitted by worker - blocksPerBatch
// Each worker submits batchesPerWorker*blocksPerBatch blocks
// In total we will submit workers*batchesPerWorker*blocksPerBatch
func TestFollowerHappyPath(t *testing.T) {
allIdentities := unittest.CompleteIdentitySet()
rootSnapshot := unittest.RootSnapshotFixture(allIdentities)
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
metrics := metrics.NewNoopCollector()
tracer := trace.NewNoopTracer()
log := unittest.Logger()
consumer := events.NewNoop()
all := storageutil.StorageLayer(t, db)
// bootstrap root snapshot
state, err := pbadger.Bootstrap(
metrics,
db,
all.Headers,
all.Seals,
all.Results,
all.Blocks,
all.QuorumCertificates,
all.Setups,
all.EpochCommits,
all.EpochProtocolStateEntries,
all.ProtocolKVStore,
all.VersionBeacons,
rootSnapshot,
)
require.NoError(t, err)
mockTimer := util.MockBlockTimer()
// create follower state
followerState, err := pbadger.NewFollowerState(
log,
tracer,
consumer,
state,
all.Index,
all.Payloads,
mockTimer,
)
require.NoError(t, err)
finalizer := moduleconsensus.NewFinalizer(db, all.Headers, followerState, tracer)
rootHeader, err := rootSnapshot.Head()
require.NoError(t, err)
rootQC, err := rootSnapshot.QuorumCertificate()
require.NoError(t, err)
rootProtocolState, err := rootSnapshot.ProtocolState()
require.NoError(t, err)
rootProtocolStateID := rootProtocolState.ID()
consensusConsumer := pubsub.NewFollowerDistributor()
// use real consensus modules
forks, err := consensus.NewForks(rootHeader, all.Headers, finalizer, consensusConsumer, rootHeader, rootQC)
require.NoError(t, err)
// assume all proposals are valid
validator := mocks.NewValidator(t)
validator.On("ValidateProposal", mock.Anything).Return(nil)
// initialize the follower loop
followerLoop, err := hotstuff.NewFollowerLoop(unittest.Logger(), metrics, forks)
require.NoError(t, err)
syncCore := module.NewBlockRequester(t)
followerCore, err := NewComplianceCore(
unittest.Logger(),
metrics,
metrics,
consensusConsumer,
followerState,
followerLoop,
validator,
syncCore,
tracer,
)
require.NoError(t, err)
me := module.NewLocal(t)
nodeID := unittest.IdentifierFixture()
me.On("NodeID").Return(nodeID).Maybe()
net := mocknetwork.NewNetwork(t)
con := mocknetwork.NewConduit(t)
net.On("Register", mock.Anything, mock.Anything).Return(con, nil)
// use real engine
engine, err := NewComplianceLayer(
unittest.Logger(),
net,
me,
metrics,
all.Headers,
rootHeader,
followerCore,
compliance.DefaultConfig(),
)
require.NoError(t, err)
// don't forget to subscribe for finalization notifications
consensusConsumer.AddOnBlockFinalizedConsumer(engine.OnFinalizedBlock)
// start hotstuff logic and follower engine
ctx, cancel, errs := irrecoverable.WithSignallerAndCancel(context.Background())
followerLoop.Start(ctx)
engine.Start(ctx)
unittest.RequireCloseBefore(t, moduleutil.AllReady(engine, followerLoop), time.Second, "engine failed to start")
// prepare chain of blocks, we will use a continuous chain assuming it was generated on happy path.
workers := 5
batchesPerWorker := 10
blocksPerBatch := 100
blocksPerWorker := blocksPerBatch * batchesPerWorker
flowBlocks := unittest.ChainFixtureFrom(workers*blocksPerWorker, rootHeader)
require.Greaterf(t, len(flowBlocks), defaultPendingBlocksCacheCapacity, "this test assumes that we operate with more blocks than cache's upper limit")
// ensure sequential block views - that way we can easily know which block will be finalized after the test
for i, block := range flowBlocks {
block.Header.View = block.Header.Height
block.SetPayload(unittest.PayloadFixture(unittest.WithProtocolStateID(rootProtocolStateID)))
if i > 0 {
block.Header.ParentView = flowBlocks[i-1].Header.View
block.Header.ParentID = flowBlocks[i-1].Header.ID()
}
}
pendingBlocks := flowBlocksToBlockProposals(flowBlocks...)
// Regarding the block that we expect to be finalized based on 2-chain finalization rule, we consider the last few blocks in `pendingBlocks`
// ... <-- X <-- Y <-- Z
// ╰─────────╯
// 2-chain on top of X
// Hence, we expect X to be finalized, which has the index `len(pendingBlocks)-3`
// Note: the HotStuff Follower does not see block Z (as there is no QC for X proving its validity). Instead, it sees the certified block
// [◄(X) Y] ◄(Y)
// where ◄(B) denotes a QC for block B
targetBlockHeight := pendingBlocks[len(pendingBlocks)-3].Block.Header.Height
// emulate syncing logic, where we push same blocks over and over.
originID := unittest.IdentifierFixture()
submittingBlocks := atomic.NewBool(true)
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(blocks []*messages.BlockProposal) {
defer wg.Done()
for submittingBlocks.Load() {
for batch := 0; batch < batchesPerWorker; batch++ {
engine.OnSyncedBlocks(flow.Slashable[[]*messages.BlockProposal]{
OriginID: originID,
Message: blocks[batch*blocksPerBatch : (batch+1)*blocksPerBatch],
})
}
}
}(pendingBlocks[i*blocksPerWorker : (i+1)*blocksPerWorker])
}
// wait for target block to become finalized, this might take a while.
require.Eventually(t, func() bool {
final, err := followerState.Final().Head()
require.NoError(t, err)
return final.Height == targetBlockHeight
}, time.Minute, time.Second, "expect to process all blocks before timeout")
// shutdown and cleanup test
submittingBlocks.Store(false)
unittest.RequireReturnsBefore(t, wg.Wait, time.Second, "expect workers to stop producing")
cancel()
unittest.RequireCloseBefore(t, moduleutil.AllDone(engine, followerLoop), time.Second, "engine failed to stop")
select {
case err := <-errs:
require.NoError(t, err)
default:
}
})
}