diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 601be3b7741..ad9661a153f 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -77,8 +77,13 @@ jobs: Bench tests were run a total of ${{ steps.settings.outputs.benchmark_repetitions }} times on each branch. - ## Results +
+ Collapsed results for better readability +

${{ env.BENCHSTAT }} +

+
+ edit-mode: replace diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c5bea57765..0407779e4d6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -65,7 +65,7 @@ jobs: max_attempts: 3 command: make ci - name: Upload coverage report - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v2 with: file: ./coverage.txt flags: unittests diff --git a/Makefile b/Makefile index da8500d57cd..2a838b7d811 100644 --- a/Makefile +++ b/Makefile @@ -332,7 +332,7 @@ tool-transit: docker-build-bootstrap-transit .PHONY: docker-build-loader docker-build-loader: - docker build -f ./integration/loader/Dockerfile --build-arg TARGET=./cmd/loader --target production \ + docker build -f ./integration/loader/Dockerfile --build-arg TARGET=./loader --target production \ --label "git_commit=${COMMIT}" --label "git_tag=${IMAGE_TAG}" \ -t "$(CONTAINER_REGISTRY)/loader:latest" -t "$(CONTAINER_REGISTRY)/loader:$(SHORT_COMMIT)" -t "$(CONTAINER_REGISTRY)/loader:$(IMAGE_TAG)" . diff --git a/cmd/bootstrap/cmd/finalize.go b/cmd/bootstrap/cmd/finalize.go index cfab58e3b22..49f809e9f73 100644 --- a/cmd/bootstrap/cmd/finalize.go +++ b/cmd/bootstrap/cmd/finalize.go @@ -168,6 +168,8 @@ func finalize(cmd *cobra.Command, args []string) { votes := readRootBlockVotes() log.Info().Msg("") + log.Info().Msgf("received votes total: %v", len(votes)) + log.Info().Msg("reading dkg data") dkgData := readDKGData() log.Info().Msg("") diff --git a/cmd/bootstrap/run/qc.go b/cmd/bootstrap/run/qc.go index 1f578d1ea1e..43430870bb9 100644 --- a/cmd/bootstrap/run/qc.go +++ b/cmd/bootstrap/run/qc.go @@ -73,6 +73,11 @@ func GenerateRootQC(block *flow.Block, votes []*model.Vote, participantData *Par } } + if createdQC == nil { + return nil, fmt.Errorf("QC is not created, total number of votes %v, expect to have 2/3 votes of %v participants", + len(votes), len(identities)) + } + // STEP 3: validate constructed QC val, err := createValidator(committee) if err != nil { diff --git a/cmd/util/cmd/export-json-execution-state/cmd.go b/cmd/util/cmd/export-json-execution-state/cmd.go index e72efb545a1..0af67ca6ca5 100644 --- a/cmd/util/cmd/export-json-execution-state/cmd.go +++ b/cmd/util/cmd/export-json-execution-state/cmd.go @@ -4,6 +4,7 @@ import ( "bufio" "compress/gzip" "encoding/hex" + "errors" "fmt" "io" "os" @@ -13,6 +14,9 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" + "github.com/onflow/flow-go/ledger/complete/mtrie" + "github.com/onflow/flow-go/ledger/complete/mtrie/trie" + "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/pathfinder" "github.com/onflow/flow-go/ledger/complete" @@ -25,6 +29,7 @@ var ( flagOutputDir string flagStateCommitment string flagGzip bool + flagFullSearch bool ) var Cmd = &cobra.Command{ @@ -47,23 +52,28 @@ func init() { Cmd.Flags().BoolVar(&flagGzip, "gzip", true, "Write GZip-encoded") + + Cmd.Flags().BoolVar(&flagFullSearch, "full-search", false, + "Use full search (WARNING - traverse all WALs, extremely slow)") } func run(*cobra.Command, []string) { log.Info().Msg("start exporting ledger") - err := ExportLedger(flagExecutionStateDir, flagStateCommitment, flagOutputDir) + err := ExportLedger(flagExecutionStateDir, flagStateCommitment, flagOutputDir, flagFullSearch) if err != nil { log.Fatal().Err(err).Msg("cannot get export ledger") } } // ExportLedger exports ledger key value pairs at the given blockID -func ExportLedger(ledgerPath string, targetstate string, outputPath string) error { +func ExportLedger(ledgerPath string, targetstate string, outputPath string, fullSearch bool) error { + + noopMetrics := &metrics.NoopCollector{} diskWal, err := wal.NewDiskWAL( zerolog.Nop(), nil, - &metrics.NoopCollector{}, + noopMetrics, ledgerPath, complete.DefaultCacheSize, pathfinder.PathByteSize, @@ -76,17 +86,11 @@ func ExportLedger(ledgerPath string, targetstate string, outputPath string) erro <-diskWal.Done() }() - led, err := complete.NewLedger(diskWal, complete.DefaultCacheSize, &metrics.NoopCollector{}, log.Logger, 0) - if err != nil { - return fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err) - } - var state ledger.State - // if no target state provided export the most recent state + if len(targetstate) == 0 { - state, err = led.MostRecentTouchedState() - if err != nil { - return fmt.Errorf("failed to load most recently used state: %w", err) + if fullSearch { + return fmt.Errorf("target state must be provided when using full search") } } else { st, err := hex.DecodeString(targetstate) @@ -98,6 +102,86 @@ func ExportLedger(ledgerPath string, targetstate string, outputPath string) erro return fmt.Errorf("failed to convert bytes to state: %w", err) } } + + var write func(writer io.Writer) error + + if fullSearch { + forest, err := mtrie.NewForest(complete.DefaultCacheSize, noopMetrics, func(evictedTrie *trie.MTrie) {}) + if err != nil { + return fmt.Errorf("cannot create forest: %w", err) + } + + diskWal.PauseRecord() + + sentinel := fmt.Errorf("we_got_the_trie_error") + + rootState := ledger.RootHash(state) + + err = diskWal.ReplayLogsOnly( + func(tries []*trie.MTrie) error { + err = forest.AddTries(tries) + if err != nil { + return fmt.Errorf("adding rebuilt tries to forest failed: %w", err) + } + + for _, trie := range tries { + rootHash := trie.RootHash() + if rootState.Equals(rootHash) { + return sentinel + } + } + return nil + }, + func(update *ledger.TrieUpdate) error { + rootHash, err := forest.Update(update) + if rootState.Equals(rootHash) { + return sentinel + } + return err + }, + func(rootHash ledger.RootHash) error { + forest.RemoveTrie(rootHash) + return nil + }, + ) + + if err != nil { + if !errors.Is(err, sentinel) { + return fmt.Errorf("cannot restore LedgerWAL: %w", err) + } + } + + write = func(writer io.Writer) error { + mTrie, err := forest.GetTrie(rootState) + if err != nil { + return fmt.Errorf("cannot get trie") + } + return mTrie.DumpAsJSON(writer) + } + + } else { + led, err := complete.NewLedger(diskWal, complete.DefaultCacheSize, noopMetrics, log.Logger, 0) + if err != nil { + return fmt.Errorf("cannot create ledger from write-a-head logs and checkpoints: %w", err) + } + + // if no target state provided export the most recent state + if len(targetstate) == 0 { + state, err = led.MostRecentTouchedState() + if err != nil { + return fmt.Errorf("failed to load most recently used state: %w", err) + } + } + + write = func(writer io.Writer) error { + err = led.DumpTrieAsJSON(state, writer) + if err != nil { + return fmt.Errorf("cannot dump trie as json: %w", err) + } + return nil + } + } + filename := state.String() + ".trie.jsonl" if flagGzip { filename += ".gz" @@ -120,9 +204,5 @@ func ExportLedger(ledgerPath string, targetstate string, outputPath string) erro writer = gzipWriter } - err = led.DumpTrieAsJSON(state, writer) - if err != nil { - return fmt.Errorf("cannot dump trie as json: %w", err) - } - return nil + return write(writer) } diff --git a/consensus/hotstuff/forks/finalizer/finalizer.go b/consensus/hotstuff/forks/finalizer/finalizer.go index 8fb284e1d64..92cf79e8355 100644 --- a/consensus/hotstuff/forks/finalizer/finalizer.go +++ b/consensus/hotstuff/forks/finalizer/finalizer.go @@ -258,9 +258,6 @@ func (r *Finalizer) getNextAncestryLevel(block *model.Block) (*forks.BlockQC, er return nil, ErrPrunedAncestry } - if block.QC.View < r.lastFinalized.Block.View { - return nil, ErrPrunedAncestry - } parentVertex, parentBlockKnown := r.forest.GetVertex(block.QC.BlockID) if !parentBlockKnown { return nil, model.MissingBlockError{View: block.QC.View, BlockID: block.QC.BlockID} diff --git a/crypto/dkg_test.go b/crypto/dkg_test.go index ca5881ddeb0..91931ee21fa 100644 --- a/crypto/dkg_test.go +++ b/crypto/dkg_test.go @@ -1,3 +1,4 @@ +//go:build relic // +build relic package crypto @@ -285,7 +286,7 @@ func dkgCommonTest(t *testing.T, dkg int, n int, threshold int, test testCase) { } phase := 0 - if dkg == feldmanVSS { + if dkg == feldmanVSS { // jump to the last phase since there is only one phase for feldmanVSS phase = 2 } @@ -392,6 +393,7 @@ func dkgRunChan(proc *testDKGProcessor, } // if no message is received by the channel, call the DKG timeout case <-time.After(phaseSwitchTimeout): + proc.startSync.Wait() // avoids racing when starting isn't over yet switch phase { case 0: log.Infof("%d shares phase ended\n", proc.current) diff --git a/engine/access/rest/test_helpers.go b/engine/access/rest/test_helpers.go index 9b85f91e723..eb63376da4e 100644 --- a/engine/access/rest/test_helpers.go +++ b/engine/access/rest/test_helpers.go @@ -28,7 +28,7 @@ const ( func executeRequest(req *http.Request, backend *mock.API) (*httptest.ResponseRecorder, error) { var b bytes.Buffer logger := zerolog.New(&b) - router, err := newRouter(backend, logger, flow.Canary.Chain()) + router, err := newRouter(backend, logger, flow.Testnet.Chain()) if err != nil { return nil, err } diff --git a/engine/collection/epochmgr/factories/builder.go b/engine/collection/epochmgr/factories/builder.go index e34b12974a0..ccd4dbebce8 100644 --- a/engine/collection/epochmgr/factories/builder.go +++ b/engine/collection/epochmgr/factories/builder.go @@ -1,6 +1,8 @@ package factories import ( + "fmt" + "github.com/dgraph-io/badger/v2" "github.com/onflow/flow-go/module" @@ -46,7 +48,7 @@ func (f *BuilderFactory) Create( pool mempool.Transactions, ) (module.Builder, *finalizer.Finalizer, error) { - build := builder.NewBuilder( + build, err := builder.NewBuilder( f.db, f.trace, f.mainChainHeaders, @@ -55,6 +57,9 @@ func (f *BuilderFactory) Create( pool, f.opts..., ) + if err != nil { + return nil, nil, fmt.Errorf("could not create builder: %w", err) + } final := finalizer.NewFinalizer( f.db, diff --git a/engine/execution/computation/execution_verification_test.go b/engine/execution/computation/execution_verification_test.go index aead0f4636b..b331ddb27ec 100644 --- a/engine/execution/computation/execution_verification_test.go +++ b/engine/execution/computation/execution_verification_test.go @@ -34,7 +34,10 @@ import ( "github.com/stretchr/testify/require" ) -var chain = flow.Mainnet.Chain() +var chain = flow.Emulator.Chain() + +// In the following tests the system transaction is expected to fail, as the epoch related things are not set up properly. +// This is not relevant to the test, as only the non-system transactions are tested. func Test_ExecutionMatchesVerification(t *testing.T) { t.Run("empty block", func(t *testing.T) { @@ -624,6 +627,7 @@ func executeBlockAndVerifyWithParameters(t *testing.T, logger := zerolog.Nop() opts = append(opts, fvm.WithChain(chain)) + opts = append(opts, fvm.WithBlocks(&fvm.NoopBlockFinder{})) fvmContext := fvm.NewContext( @@ -657,7 +661,7 @@ func executeBlockAndVerifyWithParameters(t *testing.T, view := delta.NewView(state.LedgerGetRegister(ledger, initialCommit)) - executableBlock := unittest.ExecutableBlockFromTransactions(txs) + executableBlock := unittest.ExecutableBlockFromTransactions(chain.ChainID(), txs) executableBlock.StartState = &initialCommit computationResult, err := blockComputer.ExecuteBlock(context.Background(), executableBlock, view, programs.NewEmptyPrograms()) diff --git a/fvm/fvm_bench_test.go b/fvm/fvm_bench_test.go index 8c64ae00ecf..16bcb3351ec 100644 --- a/fvm/fvm_bench_test.go +++ b/fvm/fvm_bench_test.go @@ -198,7 +198,7 @@ func (b *BasicBlockExecutor) ServiceAccount(_ testing.TB) *TestBenchAccount { } func (b *BasicBlockExecutor) ExecuteCollections(tb testing.TB, collections [][]*flow.TransactionBody) *execution.ComputationResult { - executableBlock := unittest.ExecutableBlockFromTransactions(collections) + executableBlock := unittest.ExecutableBlockFromTransactions(b.chain.ChainID(), collections) executableBlock.StartState = &b.activeStateCommitment computationResult, err := b.blockComputer.ExecuteBlock(context.Background(), executableBlock, b.activeView, b.programCache) diff --git a/fvm/fvm_test.go b/fvm/fvm_test.go index b657b964e2a..6dad551b88b 100644 --- a/fvm/fvm_test.go +++ b/fvm/fvm_test.go @@ -2993,6 +2993,53 @@ func TestBlockContext_ExecuteTransaction_FailingTransactions(t *testing.T) { }), ) + t.Run("Transaction fails because of recipient account not existing", newVMTest().withBootstrapProcedureOptions( + fvm.WithMinimumStorageReservation(fvm.DefaultMinimumStorageReservation), + fvm.WithAccountCreationFee(fvm.DefaultAccountCreationFee), + fvm.WithStorageMBPerFLOW(fvm.DefaultStorageMBPerFLOW), + ).run( + func(t *testing.T, vm *fvm.VirtualMachine, chain flow.Chain, ctx fvm.Context, view state.View, programs *programs.Programs) { + ctx.LimitAccountStorage = true // this test requires storage limits to be enforced + + // Create an account private key. + privateKeys, err := testutil.GenerateAccountPrivateKeys(1) + require.NoError(t, err) + + // Bootstrap a ledger, creating accounts with the provided private keys and the root account. + accounts, err := testutil.CreateAccounts(vm, view, programs, privateKeys, chain) + require.NoError(t, err) + + // non-existent account + lastAddress, err := chain.AddressAtIndex((1 << 45) - 1) + require.NoError(t, err) + + balanceBefore := getBalance(vm, chain, ctx, view, accounts[0]) + + // transfer tokens to non-existent account + txBody := transferTokensTx(chain). + AddAuthorizer(accounts[0]). + AddArgument(jsoncdc.MustEncode(cadence.UFix64(1))). + AddArgument(jsoncdc.MustEncode(cadence.NewAddress(lastAddress))) + + txBody.SetProposalKey(accounts[0], 0, 0) + txBody.SetPayer(accounts[0]) + + err = testutil.SignEnvelope(txBody, accounts[0], privateKeys[0]) + require.NoError(t, err) + + tx := fvm.Transaction(txBody, 0) + + err = vm.Run(ctx, tx, view, programs) + require.NoError(t, err) + + require.Equal(t, (&errors.CadenceRuntimeError{}).Code(), tx.Err.Code()) + + balanceAfter := getBalance(vm, chain, ctx, view, accounts[0]) + + require.Equal(t, balanceAfter, balanceBefore) + }), + ) + t.Run("Transaction sequence number check fails and sequence number is not incremented", newVMTest().withBootstrapProcedureOptions( fvm.WithMinimumStorageReservation(fvm.DefaultMinimumStorageReservation), fvm.WithAccountCreationFee(fvm.DefaultAccountCreationFee), diff --git a/fvm/systemcontracts/system_contracts.go b/fvm/systemcontracts/system_contracts.go index 9c4af19e362..01494b3fcdc 100644 --- a/fvm/systemcontracts/system_contracts.go +++ b/fvm/systemcontracts/system_contracts.go @@ -152,7 +152,7 @@ var contractAddressesByChainID map[flow.ChainID]map[string]flow.Address var ( // stakingContractAddressMainnet is the address of the FlowIDTableStaking contract on Mainnet stakingContractAddressMainnet = flow.HexToAddress("8624b52f9ddcd04a") - // stakingContractAddressTestnet is the address of the FlowIDTableStaking contract on Testnet and Canary + // stakingContractAddressTestnet is the address of the FlowIDTableStaking contract on Testnet stakingContractAddressTestnet = flow.HexToAddress("9eca2b38b18b5dfe") ) @@ -176,7 +176,15 @@ func init() { ContractNameDKG: stakingContractAddressTestnet, } contractAddressesByChainID[flow.Testnet] = testnet - contractAddressesByChainID[flow.Canary] = testnet + + // Canary test network + // All system contracts are deployed to the service account + canary := map[string]flow.Address{ + ContractNameEpoch: flow.Canary.Chain().ServiceAddress(), + ContractNameClusterQC: flow.Canary.Chain().ServiceAddress(), + ContractNameDKG: flow.Canary.Chain().ServiceAddress(), + } + contractAddressesByChainID[flow.Canary] = canary // Transient test networks // All system contracts are deployed to the service account @@ -188,4 +196,5 @@ func init() { contractAddressesByChainID[flow.Emulator] = transient contractAddressesByChainID[flow.Localnet] = transient contractAddressesByChainID[flow.Benchnet] = transient + } diff --git a/integration/tests/epochs/epoch_join_and_leave_an_test.go b/integration/tests/epochs/epoch_join_and_leave_an_test.go index b7b7055f492..fd132940cc6 100644 --- a/integration/tests/epochs/epoch_join_and_leave_an_test.go +++ b/integration/tests/epochs/epoch_join_and_leave_an_test.go @@ -9,7 +9,7 @@ import ( ) func TestEpochJoinAndLeaveAN(t *testing.T) { - unittest.SkipUnless(t, unittest.TEST_RESOURCE_INTENSIVE, "epochs join/leave tests should be run on an machine with adequate resources") + unittest.SkipUnless(t, unittest.TEST_FLAKY, "epochs join/leave tests should be run on an machine with adequate resources") suite.Run(t, new(EpochJoinAndLeaveANSuite)) } diff --git a/integration/tests/epochs/epoch_join_and_leave_vn_test.go b/integration/tests/epochs/epoch_join_and_leave_vn_test.go index 5a2e31a779e..1aedb3d3356 100644 --- a/integration/tests/epochs/epoch_join_and_leave_vn_test.go +++ b/integration/tests/epochs/epoch_join_and_leave_vn_test.go @@ -10,7 +10,7 @@ import ( ) func TestEpochJoinAndLeaveVN(t *testing.T) { - unittest.SkipUnless(t, unittest.TEST_RESOURCE_INTENSIVE, "epochs join/leave tests should be run on an machine with adequate resources") + unittest.SkipUnless(t, unittest.TEST_FLAKY, "epochs join/leave tests should be run on an machine with adequate resources") suite.Run(t, new(EpochJoinAndLeaveVNSuite)) } diff --git a/model/flow/address.go b/model/flow/address.go index d340ab45e48..19737e6bf19 100644 --- a/model/flow/address.go +++ b/model/flow/address.go @@ -258,6 +258,9 @@ const invalidCodeTestNetwork = uint64(0x6834ba37b3980209) // invalidCodeTransientNetwork is the invalid codeword used for transient test networks. const invalidCodeTransientNetwork = uint64(0x1cb159857af02018) +// invalidCodeCanaryNetwork is the invalid codeword used for Canary network. +const invalidCodeCanaryNetwork = uint64(0x1035ce4eff92ae01) + // encodeWord encodes a word into a code word. // In Flow, the word is the account index while the code word // is the corresponding address. diff --git a/model/flow/address_test.go b/model/flow/address_test.go index 581e249c64f..47dbb884e2c 100644 --- a/model/flow/address_test.go +++ b/model/flow/address_test.go @@ -98,6 +98,7 @@ func testAddressConstants(t *testing.T) { Mainnet, Testnet, Emulator, + Canary, } for _, chainID := range chainIDs { @@ -146,6 +147,7 @@ func testAddressGeneration(t *testing.T) { Mainnet, Testnet, Emulator, + Canary, } for _, chainID := range chainIDs { @@ -179,7 +181,7 @@ func testAddressGeneration(t *testing.T) { } } - if chainID != Emulator { + if chainID == Mainnet { // sanity check of address distances. // All distances between any two addresses must be less than d. @@ -231,12 +233,14 @@ func testAddressesIntersection(t *testing.T) { rand.Seed(time.Now().UnixNano()) // loops in each test - const loop = 50 + const loop = 25 // Test addresses for all type of networks chainIDs := []ChainID{ + Mainnet, Testnet, Emulator, + Canary, } for _, chainID := range chainIDs { @@ -246,22 +250,31 @@ func testAddressesIntersection(t *testing.T) { // All valid test addresses must fail Flow Mainnet check r := uint64(rand.Intn(maxIndex - loop)) state := chain.newAddressGeneratorAtIndex(r) - for i := 0; i < loop; i++ { + for k := 0; k < loop; k++ { address, err := state.NextAddress() require.NoError(t, err) - check := Mainnet.Chain().IsValid(address) - assert.False(t, check, "test account address format should be invalid in Flow") - sameChainCheck := chain.IsValid(address) - require.True(t, sameChainCheck) + for _, otherChain := range chainIDs { + if chainID != otherChain { + check := otherChain.Chain().IsValid(address) + assert.False(t, check, "test account address format should be invalid in Flow") + } else { + sameChainCheck := chain.IsValid(address) + require.True(t, sameChainCheck) + } + } } // sanity check: mainnet addresses must fail the test check r = uint64(rand.Intn(maxIndex - loop)) - for i := 0; i < loop; i++ { - invalidAddress, err := Mainnet.Chain().newAddressGeneratorAtIndex(r).NextAddress() - require.NoError(t, err) - check := chain.IsValid(invalidAddress) - assert.False(t, check, "account address format should be invalid") + for k := 0; k < loop; k++ { + for _, otherChain := range chainIDs { + if chainID != otherChain { + invalidAddress, err := otherChain.Chain().newAddressGeneratorAtIndex(r).NextAddress() + require.NoError(t, err) + check := chain.IsValid(invalidAddress) + assert.False(t, check, "account address format should be invalid") + } + } } // sanity check of invalid account addresses in all networks @@ -272,16 +285,14 @@ func testAddressesIntersection(t *testing.T) { r = uint64(rand.Intn(maxIndex - loop)) state = chain.newAddressGeneratorAtIndex(r) - for i := 0; i < loop; i++ { + for k := 0; k < loop; k++ { address, err := state.NextAddress() require.NoError(t, err) invalidAddress = uint64ToAddress(address.uint64() ^ invalidCodeWord) + // must fail test network check check = chain.IsValid(invalidAddress) assert.False(t, check, "account address format should be invalid") - // must fail mainnet check - check := Mainnet.Chain().IsValid(invalidAddress) - assert.False(t, check, "account address format should be invalid") } } } @@ -298,6 +309,7 @@ func testIndexFromAddress(t *testing.T) { mainnet, testnet, emulator, + canary, } for _, chain := range chains { diff --git a/model/flow/chain.go b/model/flow/chain.go index 395ea41c8ba..e8cbc299408 100644 --- a/model/flow/chain.go +++ b/model/flow/chain.go @@ -50,8 +50,10 @@ func (c ChainID) getChainCodeWord() uint64 { switch c { case Mainnet: return 0 - case Testnet, Canary: + case Testnet: return invalidCodeTestNetwork + case Canary: + return invalidCodeCanaryNetwork case Emulator, Localnet, Benchnet: return invalidCodeTransientNetwork default: diff --git a/model/flow/cluster.go b/model/flow/cluster.go index 543ad718015..ba2f87b69c7 100644 --- a/model/flow/cluster.go +++ b/model/flow/cluster.go @@ -50,11 +50,12 @@ func NewClusterList(assignments AssignmentList, collectors IdentityList) (Cluste // build a lookup for all the identities by node identifier lookup := make(map[Identifier]*Identity) for _, collector := range collectors { + _, ok := lookup[collector.NodeID] + if ok { + return nil, fmt.Errorf("duplicate collector in list %v", collector.NodeID) + } lookup[collector.NodeID] = collector } - if len(lookup) != len(collectors) { - return nil, fmt.Errorf("duplicate collector in list") - } // replicate the identifier list but use identities instead clusters := make(ClusterList, 0, len(assignments)) diff --git a/model/flow/constants.go b/model/flow/constants.go index d7f60cda4c0..cfaa4e81a23 100644 --- a/model/flow/constants.go +++ b/model/flow/constants.go @@ -12,8 +12,13 @@ var GenesisTime = time.Date(2018, time.December, 19, 22, 32, 30, 42, time.UTC) // explicitly set during bootstrapping. const DefaultProtocolVersion = 0 -// DefaultTransactionExpiry is the default expiry for transactions, measured -// in blocks. Equivalent to 10 minutes for a 1-second block time. +// DefaultTransactionExpiry is the default expiry for transactions, measured in blocks. +// The default value is equivalent to 10 minutes for a 1-second block time. +// +// Let E by the transaction expiry. If a transaction T specifies a reference +// block R with height H, then T may be included in any block B where: +// * R<-*B - meaning B has R as an ancestor, and +// * R.height < B.height <= R.height+E const DefaultTransactionExpiry = 10 * 60 // DefaultTransactionExpiryBuffer is the default buffer time between a transaction being ingested by a diff --git a/module/builder/collection/builder.go b/module/builder/collection/builder.go index ae22190cb84..645ed7cc2a4 100644 --- a/module/builder/collection/builder.go +++ b/module/builder/collection/builder.go @@ -15,6 +15,7 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/mempool" "github.com/onflow/flow-go/module/trace" + "github.com/onflow/flow-go/state/fork" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/procedure" @@ -36,15 +37,7 @@ type Builder struct { config Config } -func NewBuilder( - db *badger.DB, - tracer module.Tracer, - mainHeaders storage.Headers, - clusterHeaders storage.Headers, - payloads storage.ClusterPayloads, - transactions mempool.Transactions, - opts ...Opt, -) *Builder { +func NewBuilder(db *badger.DB, tracer module.Tracer, mainHeaders storage.Headers, clusterHeaders storage.Headers, payloads storage.ClusterPayloads, transactions mempool.Transactions, opts ...Opt) (*Builder, error) { b := Builder{ db: db, @@ -60,277 +53,276 @@ func NewBuilder( apply(&b.config) } - return &b + // sanity check config + if b.config.ExpiryBuffer >= flow.DefaultTransactionExpiry { + return nil, fmt.Errorf("invalid configured expiry buffer exceeds tx expiry (%d > %d)", b.config.ExpiryBuffer, flow.DefaultTransactionExpiry) + } + + return &b, nil } // BuildOn creates a new block built on the given parent. It produces a payload // that is valid with respect to the un-finalized chain it extends. func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) error) (*flow.Header, error) { - var proposal cluster.Block + var proposal cluster.Block // proposal we are building + var parent flow.Header // parent of the proposal we are building + var clusterChainFinalizedBlock flow.Header // finalized block on the cluster chain + var refChainFinalizedHeight uint64 // finalized height on reference chain + var refChainFinalizedID flow.Identifier // finalized block ID on reference chain startTime := time.Now() - // first we construct a proposal in-memory, ensuring it is a valid extension - // of chain state -- this can be done in a read-only transaction - err := b.db.View(func(tx *badger.Txn) error { + // STEP ONE: build a lookup for excluding duplicated transactions. + // This is briefly how it works: + // + // Let E be the global transaction expiry. + // When incorporating a new collection C, with reference height R, we enforce + // that it contains only transactions with reference heights in [R,R+E). + // * if we are building C: + // * we don't build expired collections (ie. our local finalized consensus height is at most R+E-1) + // * we don't include transactions referencing un-finalized blocks + // * therefore, C will contain only transactions with reference heights in [R,R+E) + // * if we are validating C: + // * honest validators only consider C valid if all its transactions have reference heights in [R,R+E) + // + // Therefore, to check for duplicates, we only need a lookup for transactions in collection + // with expiry windows that overlap with our collection under construction. + // + // A collection with overlapping expiry window can be finalized or un-finalized. + // * to find all non-expired and finalized collections, we make use of an index + // (main_chain_finalized_height -> cluster_block_ids with respective reference height), to search for a range of main chain heights // which could be only referenced by collections with overlapping expiry windows. + // * to find all overlapping and un-finalized collections, we can't use the above index, because it's + // only for finalized collections. Instead, we simply traverse along the chain up to the last + // finalized block. This could possibly include some collections with expiry windows that DON'T + // overlap with our collection under construction, but it is unlikely and doesn't impact correctness. + // + // After combining both the finalized and un-finalized cluster blocks that overlap with our expiry window, + // we can iterate through their transactions, and build a lookup for excluding duplicated transactions. + err := b.db.View(func(btx *badger.Txn) error { - // STEP ONE: Load some things we need to do our work. // TODO (ramtin): enable this again // b.tracer.StartSpan(parentID, trace.COLBuildOnSetup) // defer b.tracer.FinishSpan(parentID, trace.COLBuildOnSetup) - var parent flow.Header - err := operation.RetrieveHeader(parentID, &parent)(tx) + err := operation.RetrieveHeader(parentID, &parent)(btx) if err != nil { return fmt.Errorf("could not retrieve parent: %w", err) } // retrieve the height and ID of the latest finalized block ON THE MAIN CHAIN // this is used as the reference point for transaction expiry - var refChainFinalizedHeight uint64 - err = operation.RetrieveFinalizedHeight(&refChainFinalizedHeight)(tx) + err = operation.RetrieveFinalizedHeight(&refChainFinalizedHeight)(btx) if err != nil { return fmt.Errorf("could not retrieve main finalized height: %w", err) } - var refChainFinalizedID flow.Identifier - err = operation.LookupBlockHeight(refChainFinalizedHeight, &refChainFinalizedID)(tx) + err = operation.LookupBlockHeight(refChainFinalizedHeight, &refChainFinalizedID)(btx) if err != nil { return fmt.Errorf("could not retrieve main finalized ID: %w", err) } // retrieve the finalized boundary ON THE CLUSTER CHAIN - var clusterFinal flow.Header - err = procedure.RetrieveLatestFinalizedClusterHeader(parent.ChainID, &clusterFinal)(tx) + err = procedure.RetrieveLatestFinalizedClusterHeader(parent.ChainID, &clusterChainFinalizedBlock)(btx) if err != nil { return fmt.Errorf("could not retrieve cluster final: %w", err) } + return nil + }) + if err != nil { + return nil, err + } - // STEP TWO: create a lookup of all previously used transactions on the - // part of the chain we care about. We do this separately for - // un-finalized and finalized sections of the chain to decide whether to - // remove conflicting transactions from the mempool. + // pre-compute the minimum possible reference block height for transactions + // included in this collection (actual reference height may be greater) + minPossibleRefHeight := refChainFinalizedHeight - uint64(flow.DefaultTransactionExpiry-b.config.ExpiryBuffer) + if minPossibleRefHeight > refChainFinalizedHeight { + minPossibleRefHeight = 0 // overflow check + } - // TODO (ramtin): enable this again - // b.tracer.FinishSpan(parentID, trace.COLBuildOnSetup) - // b.tracer.StartSpan(parentID, trace.COLBuildOnUnfinalizedLookup) - // defer b.tracer.FinishSpan(parentID, trace.COLBuildOnUnfinalizedLookup) - - // RATE LIMITING: the builder module can be configured to limit the - // rate at which transactions with a common payer are included in - // blocks. Depending on the configured limit, we either allow 1 - // transaction every N sequential collections, or we allow K transactions - // per collection. - - // keep track of transactions in the ancestry to avoid duplicates - lookup := newTransactionLookup() - // keep track of transactions to enforce rate limiting - limiter := newRateLimiter(b.config, parent.Height+1) - - // look up previously included transactions in UN-FINALIZED ancestors - ancestorID := parentID - clusterFinalID := clusterFinal.ID() - for ancestorID != clusterFinalID { - ancestor, err := b.clusterHeaders.ByBlockID(ancestorID) - if err != nil { - return fmt.Errorf("could not get ancestor header (%x): %w", ancestorID, err) - } - - if ancestor.Height <= clusterFinal.Height { - return fmt.Errorf("should always build on last finalized block") - } - - payload, err := b.payloads.ByBlockID(ancestorID) - if err != nil { - return fmt.Errorf("could not get ancestor payload (%x): %w", ancestorID, err) - } - - collection := payload.Collection - for _, tx := range collection.Transactions { - lookup.addUnfinalizedAncestor(tx.ID()) - limiter.addAncestor(ancestor.Height, tx) - } - ancestorID = ancestor.ParentID + // TODO (ramtin): enable this again + // b.tracer.FinishSpan(parentID, trace.COLBuildOnSetup) + // b.tracer.StartSpan(parentID, trace.COLBuildOnUnfinalizedLookup) + // defer b.tracer.FinishSpan(parentID, trace.COLBuildOnUnfinalizedLookup) + + // STEP TWO: create a lookup of all previously used transactions on the + // part of the chain we care about. We do this separately for + // un-finalized and finalized sections of the chain to decide whether to + // remove conflicting transactions from the mempool. + + // keep track of transactions in the ancestry to avoid duplicates + lookup := newTransactionLookup() + // keep track of transactions to enforce rate limiting + limiter := newRateLimiter(b.config, parent.Height+1) + + // RATE LIMITING: the builder module can be configured to limit the + // rate at which transactions with a common payer are included in + // blocks. Depending on the configured limit, we either allow 1 + // transaction every N sequential collections, or we allow K transactions + // per collection. + + // first, look up previously included transactions in UN-FINALIZED ancestors + err = b.populateUnfinalizedAncestryLookup(parentID, clusterChainFinalizedBlock.Height, lookup, limiter) + if err != nil { + return nil, fmt.Errorf("could not populate un-finalized ancestry lookout (parent_id=%x): %w", parentID, err) + } + + // TODO (ramtin): enable this again + // b.tracer.FinishSpan(parentID, trace.COLBuildOnUnfinalizedLookup) + // b.tracer.StartSpan(parentID, trace.COLBuildOnFinalizedLookup) + // defer b.tracer.FinishSpan(parentID, trace.COLBuildOnFinalizedLookup) + + // second, look up previously included transactions in FINALIZED ancestors + err = b.populateFinalizedAncestryLookup(minPossibleRefHeight, refChainFinalizedHeight, lookup, limiter) + if err != nil { + return nil, fmt.Errorf("could not populate finalized ancestry lookup: %w", err) + } + + // TODO (ramtin): enable this again + // b.tracer.FinishSpan(parentID, trace.COLBuildOnFinalizedLookup) + // b.tracer.StartSpan(parentID, trace.COLBuildOnCreatePayload) + // defer b.tracer.FinishSpan(parentID, trace.COLBuildOnCreatePayload) + + // STEP THREE: build a payload of valid transactions, while at the same + // time figuring out the correct reference block ID for the collection. + + // keep track of the actual smallest reference height of all included transactions + minRefHeight := uint64(math.MaxUint64) + minRefID := refChainFinalizedID + + var transactions []*flow.TransactionBody + var totalByteSize uint64 + var totalGas uint64 + for _, tx := range b.transactions.All() { + + // if we have reached maximum number of transactions, stop + if uint(len(transactions)) >= b.config.MaxCollectionSize { + break } - // TODO (ramtin): enable this again - // b.tracer.FinishSpan(parentID, trace.COLBuildOnUnfinalizedLookup) - // b.tracer.StartSpan(parentID, trace.COLBuildOnFinalizedLookup) - // defer b.tracer.FinishSpan(parentID, trace.COLBuildOnFinalizedLookup) - - //TODO for now we check a fixed # of finalized ancestors - we should - // instead look back based on reference block ID and expiry - // ref: https://github.com/dapperlabs/flow-go/issues/3556 - limit := clusterFinal.Height - flow.DefaultTransactionExpiry - if limit > clusterFinal.Height { // overflow check - limit = 0 + txByteSize := uint64(tx.ByteSize()) + // ignore transactions with tx byte size bigger that the max amount per collection + // this case shouldn't happen ever since we keep a limit on tx byte size but in case + // we keep this condition + if txByteSize > b.config.MaxCollectionByteSize { + continue } - // look up previously included transactions in FINALIZED ancestors - ancestorID = clusterFinal.ID() - ancestorHeight := clusterFinal.Height - for ancestorHeight > limit { - ancestor, err := b.clusterHeaders.ByBlockID(ancestorID) - if err != nil { - return fmt.Errorf("could not get ancestor header (%x): %w", ancestorID, err) - } - payload, err := b.payloads.ByBlockID(ancestorID) - if err != nil { - return fmt.Errorf("could not get ancestor payload (%x): %w", ancestorID, err) - } - - collection := payload.Collection - for _, tx := range collection.Transactions { - lookup.addFinalizedAncestor(tx.ID()) - limiter.addAncestor(ancestor.Height, tx) - } - - ancestorID = ancestor.ParentID - ancestorHeight = ancestor.Height + // because the max byte size per tx is way smaller than the max collection byte size, we can stop here and not continue. + // to make it more effective in the future we can continue adding smaller ones + if totalByteSize+txByteSize > b.config.MaxCollectionByteSize { + break } - // STEP THREE: build a payload of valid transactions, while at the same - // time figuring out the correct reference block ID for the collection. + // ignore transactions with max gas bigger that the max total gas per collection + // this case shouldn't happen ever but in case we keep this condition + if tx.GasLimit > b.config.MaxCollectionTotalGas { + continue + } - // TODO (ramtin): enable this again - // b.tracer.FinishSpan(parentID, trace.COLBuildOnFinalizedLookup) - // b.tracer.StartSpan(parentID, trace.COLBuildOnCreatePayload) - // defer b.tracer.FinishSpan(parentID, trace.COLBuildOnCreatePayload) - - minRefHeight := uint64(math.MaxUint64) - // start with the finalized reference ID (longest expiry time) - minRefID := refChainFinalizedID - - var transactions []*flow.TransactionBody - var totalByteSize uint64 - var totalGas uint64 - for _, tx := range b.transactions.All() { - - // if we have reached maximum number of transactions, stop - if uint(len(transactions)) >= b.config.MaxCollectionSize { - break - } - - txByteSize := uint64(tx.ByteSize()) - // ignore transactions with tx byte size bigger that the max amount per collection - // this case shouldn't happen ever since we keep a limit on tx byte size but in case - // we keep this condition - if txByteSize > b.config.MaxCollectionByteSize { - continue - } - - // because the max byte size per tx is way smaller than the max collection byte size, we can stop here and not continue. - // to make it more effective in the future we can continue adding smaller ones - if totalByteSize+txByteSize > b.config.MaxCollectionByteSize { - break - } - - // ignore transactions with max gas bigger that the max total gas per collection - // this case shouldn't happen ever but in case we keep this condition - if tx.GasLimit > b.config.MaxCollectionTotalGas { - continue - } - - // cause the max gas limit per tx is way smaller than the total max gas per collection, we can stop here and not continue. - // to make it more effective in the future we can continue adding smaller ones - if totalGas+tx.GasLimit > b.config.MaxCollectionTotalGas { - break - } - - // retrieve the main chain header that was used as reference - refHeader, err := b.mainHeaders.ByBlockID(tx.ReferenceBlockID) - if errors.Is(err, storage.ErrNotFound) { - continue // in case we are configured with liberal transaction ingest rules - } - if err != nil { - return fmt.Errorf("could not retrieve reference header: %w", err) - } - - // for now, disallow un-finalized reference blocks - if refChainFinalizedHeight < refHeader.Height { - continue - } - - // ensure the reference block is not too old - txID := tx.ID() - if refChainFinalizedHeight-refHeader.Height > uint64(flow.DefaultTransactionExpiry-b.config.ExpiryBuffer) { - // the transaction is expired, it will never be valid - b.transactions.Rem(txID) - continue - } - - // check that the transaction was not already used in un-finalized history - if lookup.isUnfinalizedAncestor(txID) { - continue - } - - // check that the transaction was not already included in finalized history. - if lookup.isFinalizedAncestor(txID) { - // remove from mempool, conflicts with finalized block will never be valid - b.transactions.Rem(txID) - continue - } - - // enforce rate limiting rules - if limiter.shouldRateLimit(tx) { - continue - } - - // ensure we find the lowest reference block height - if refHeader.Height < minRefHeight { - minRefHeight = refHeader.Height - minRefID = tx.ReferenceBlockID - } - - // update per-payer transaction count - limiter.transactionIncluded(tx) - - transactions = append(transactions, tx) - totalByteSize += txByteSize - totalGas += tx.GasLimit + // cause the max gas limit per tx is way smaller than the total max gas per collection, we can stop here and not continue. + // to make it more effective in the future we can continue adding smaller ones + if totalGas+tx.GasLimit > b.config.MaxCollectionTotalGas { + break } - // STEP FOUR: we have a set of transactions that are valid to include - // on this fork. Now we need to create the collection that will be - // used in the payload and construct the final proposal model - // TODO (ramtin): enable this again - // b.tracer.FinishSpan(parentID, trace.COLBuildOnCreatePayload) - // b.tracer.StartSpan(parentID, trace.COLBuildOnCreateHeader) - // defer b.tracer.FinishSpan(parentID, trace.COLBuildOnCreateHeader) - - // build the payload from the transactions - payload := cluster.PayloadFromTransactions(minRefID, transactions...) - - header := flow.Header{ - ChainID: parent.ChainID, - ParentID: parentID, - Height: parent.Height + 1, - PayloadHash: payload.Hash(), - Timestamp: time.Now().UTC(), - - // NOTE: we rely on the HotStuff-provided setter to set the other - // fields, which are related to signatures and HotStuff internals + // retrieve the main chain header that was used as reference + refHeader, err := b.mainHeaders.ByBlockID(tx.ReferenceBlockID) + if errors.Is(err, storage.ErrNotFound) { + continue // in case we are configured with liberal transaction ingest rules + } + if err != nil { + return nil, fmt.Errorf("could not retrieve reference header: %w", err) } - // set fields specific to the consensus algorithm - err = setter(&header) + // disallow un-finalized reference blocks + if refChainFinalizedHeight < refHeader.Height { + continue + } + // make sure the reference block is finalized and not orphaned + blockFinalizedAtReferenceHeight, err := b.mainHeaders.ByHeight(refHeader.Height) if err != nil { - return fmt.Errorf("could not set fields to header: %w", err) + return nil, fmt.Errorf("could not check that reference block (id=%x) is finalized: %w", tx.ReferenceBlockID, err) + } + if blockFinalizedAtReferenceHeight.ID() != tx.ReferenceBlockID { + // the transaction references an orphaned block - it will never be valid + b.transactions.Rem(tx.ID()) + continue } - proposal = cluster.Block{ - Header: &header, - Payload: &payload, + // ensure the reference block is not too old + if refHeader.Height < minPossibleRefHeight { + // the transaction is expired, it will never be valid + b.transactions.Rem(tx.ID()) + continue } - // TODO (ramtin): enable this again - // b.tracer.FinishSpan(parentID, trace.COLBuildOnCreateHeader) + txID := tx.ID() + // check that the transaction was not already used in un-finalized history + if lookup.isUnfinalizedAncestor(txID) { + continue + } - return nil - }) + // check that the transaction was not already included in finalized history. + if lookup.isFinalizedAncestor(txID) { + // remove from mempool, conflicts with finalized block will never be valid + b.transactions.Rem(txID) + continue + } + + // enforce rate limiting rules + if limiter.shouldRateLimit(tx) { + continue + } + + // ensure we find the lowest reference block height + if refHeader.Height < minRefHeight { + minRefHeight = refHeader.Height + minRefID = tx.ReferenceBlockID + } + + // update per-payer transaction count + limiter.transactionIncluded(tx) + + transactions = append(transactions, tx) + totalByteSize += txByteSize + totalGas += tx.GasLimit + } + + // STEP FOUR: we have a set of transactions that are valid to include + // on this fork. Now we need to create the collection that will be + // used in the payload and construct the final proposal model + // TODO (ramtin): enable this again + // b.tracer.FinishSpan(parentID, trace.COLBuildOnCreatePayload) + // b.tracer.StartSpan(parentID, trace.COLBuildOnCreateHeader) + // defer b.tracer.FinishSpan(parentID, trace.COLBuildOnCreateHeader) + + // build the payload from the transactions + payload := cluster.PayloadFromTransactions(minRefID, transactions...) + + header := flow.Header{ + ChainID: parent.ChainID, + ParentID: parentID, + Height: parent.Height + 1, + PayloadHash: payload.Hash(), + Timestamp: time.Now().UTC(), + + // NOTE: we rely on the HotStuff-provided setter to set the other + // fields, which are related to signatures and HotStuff internals + } + + // set fields specific to the consensus algorithm + err = setter(&header) if err != nil { - return nil, fmt.Errorf("could not build block: %w", err) + return nil, fmt.Errorf("could not set fields to header: %w", err) + } + + proposal = cluster.Block{ + Header: &header, + Payload: &payload, } + // TODO (ramtin): enable this again + // b.tracer.FinishSpan(parentID, trace.COLBuildOnCreateHeader) + span, ctx, _ := b.tracer.StartCollectionSpan(context.Background(), proposal.ID(), trace.COLBuildOn, opentracing.StartTime(startTime)) defer span.Finish() @@ -343,5 +335,100 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er return nil, fmt.Errorf("could not insert built block: %w", err) } - return proposal.Header, err + return proposal.Header, nil +} + +// populateUnfinalizedAncestryLookup traverses the unfinalized ancestry backward +// to populate the transaction lookup (used for deduplication) and the rate limiter +// (used to limit transaction submission by payer). +// +// The traversal begins with the block specified by parentID (the block we are +// building on top of) and ends with the oldest unfinalized block in the ancestry. +func (b *Builder) populateUnfinalizedAncestryLookup(parentID flow.Identifier, finalHeight uint64, lookup *transactionLookup, limiter *rateLimiter) error { + + err := fork.TraverseBackward(b.clusterHeaders, parentID, func(ancestor *flow.Header) error { + payload, err := b.payloads.ByBlockID(ancestor.ID()) + if err != nil { + return fmt.Errorf("could not retrieve ancestor payload: %w", err) + } + + for _, tx := range payload.Collection.Transactions { + lookup.addUnfinalizedAncestor(tx.ID()) + limiter.addAncestor(ancestor.Height, tx) + } + return nil + }, fork.ExcludingHeight(finalHeight)) + + return err +} + +// populateFinalizedAncestryLookup traverses the reference block height index to +// populate the transaction lookup (used for deduplication) and the rate limiter +// (used to limit transaction submission by payer). +// +// The traversal is structured so that we check every collection whose reference +// block height translates to a possible constituent transaction which could also +// appear in the collection we are building. +func (b *Builder) populateFinalizedAncestryLookup(minRefHeight, maxRefHeight uint64, lookup *transactionLookup, limiter *rateLimiter) error { + + // Let E be the global transaction expiry constant, measured in blocks. For each + // T ∈ `includedTransactions`, we have to decide whether the transaction + // already appeared in _any_ finalized cluster block. + // Notation: + // - consider a valid cluster block C and let c be its reference block height + // - consider a transaction T ∈ `includedTransactions` and let t denote its + // reference block height + // + // Boundary conditions: + // 1. C's reference block height is equal to the lowest reference block height of + // all its constituent transactions. Hence, for collection C to potentially contain T, it must satisfy c <= t. + // 2. For T to be eligible for inclusion in collection C, _none_ of the transactions within C are allowed + // to be expired w.r.t. C's reference block. Hence, for collection C to potentially contain T, it must satisfy t < c + E. + // + // Therefore, for collection C to potentially contain transaction T, it must satisfy t - E < c <= t. + // In other words, we only need to inspect collections with reference block height c ∈ (t-E, t]. + // Consequently, for a set of transactions, with `minRefHeight` (`maxRefHeight`) being the smallest (largest) + // reference block height, we only need to inspect collections with c ∈ (minRefHeight-E, maxRefHeight]. + + // the finalized cluster blocks which could possibly contain any conflicting transactions + var clusterBlockIDs []flow.Identifier + start, end := findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight) + err := b.db.View(operation.LookupClusterBlocksByReferenceHeightRange(start, end, &clusterBlockIDs)) + if err != nil { + return fmt.Errorf("could not lookup finalized cluster blocks by reference height range [%d,%d]: %w", start, end, err) + } + + for _, blockID := range clusterBlockIDs { + header, err := b.clusterHeaders.ByBlockID(blockID) + if err != nil { + return fmt.Errorf("could not retrieve cluster header (id=%x): %w", blockID, err) + } + payload, err := b.payloads.ByBlockID(blockID) + if err != nil { + return fmt.Errorf("could not retrieve cluster payload (block_id=%x): %w", blockID, err) + } + for _, tx := range payload.Collection.Transactions { + lookup.addFinalizedAncestor(tx.ID()) + limiter.addAncestor(header.Height, tx) + } + } + + return nil +} + +// findRefHeightSearchRangeForConflictingClusterBlocks computes the range of reference +// block heights of ancestor blocks which could possibly contain transactions +// duplicating those in our collection under construction, based on the range of +// reference heights of transactions in the collection under construction. +// +// Input range is the (inclusive) range of reference heights of transactions included +// in the collection under construction. Output range is the (inclusive) range of +// reference heights which need to be searched. +func findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight uint64) (start, end uint64) { + start = minRefHeight - flow.DefaultTransactionExpiry + 1 + if start > minRefHeight { + start = 0 // overflow check + } + end = maxRefHeight + return start, end } diff --git a/module/builder/collection/builder_test.go b/module/builder/collection/builder_test.go index 7f29688edb0..a1044926c48 100644 --- a/module/builder/collection/builder_test.go +++ b/module/builder/collection/builder_test.go @@ -27,6 +27,7 @@ import ( "github.com/onflow/flow-go/state/protocol/inmem" "github.com/onflow/flow-go/state/protocol/util" storage "github.com/onflow/flow-go/storage/badger" + "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/procedure" sutil "github.com/onflow/flow-go/storage/util" "github.com/onflow/flow-go/utils/unittest" @@ -79,12 +80,12 @@ func (suite *BuilderSuite) SetupTest() { suite.payloads = storage.NewClusterPayloads(metrics, suite.db) clusterStateRoot, err := clusterkv.NewStateRoot(suite.genesis) - suite.Require().Nil(err) + suite.Require().NoError(err) clusterState, err := clusterkv.Bootstrap(suite.db, clusterStateRoot) - suite.Require().Nil(err) + suite.Require().NoError(err) suite.state, err = clusterkv.NewMutableState(clusterState, tracer, suite.headers, suite.payloads) - suite.Require().Nil(err) + suite.Require().NoError(err) // just bootstrap with a genesis block, we'll use this as reference participants := unittest.IdentityListFixture(5, unittest.WithAllRoles()) @@ -114,39 +115,51 @@ func (suite *BuilderSuite) SetupTest() { suite.Assert().True(added) } - suite.builder = builder.NewBuilder(suite.db, tracer, suite.headers, suite.headers, suite.payloads, suite.pool) + suite.builder, _ = builder.NewBuilder(suite.db, tracer, suite.headers, suite.headers, suite.payloads, suite.pool) } // runs after each test finishes func (suite *BuilderSuite) TearDownTest() { err := suite.db.Close() - suite.Assert().Nil(err) + suite.Assert().NoError(err) err = os.RemoveAll(suite.dbdir) - suite.Assert().Nil(err) + suite.Assert().NoError(err) } func (suite *BuilderSuite) InsertBlock(block model.Block) { err := suite.db.Update(procedure.InsertClusterBlock(&block)) - suite.Assert().Nil(err) + suite.Assert().NoError(err) } -func (suite *BuilderSuite) FinalizeBlock(blockID flow.Identifier) { - err := suite.db.Update(procedure.FinalizeClusterBlock(blockID)) - suite.Assert().Nil(err) +func (suite *BuilderSuite) FinalizeBlock(block model.Block) { + err := suite.db.Update(func(tx *badger.Txn) error { + var refBlock flow.Header + err := operation.RetrieveHeader(block.Payload.ReferenceBlockID, &refBlock)(tx) + if err != nil { + return err + } + err = procedure.FinalizeClusterBlock(block.ID())(tx) + if err != nil { + return err + } + err = operation.IndexClusterBlockByReferenceHeight(refBlock.Height, block.ID())(tx) + return err + }) + suite.Assert().NoError(err) } // Payload returns a payload containing the given transactions, with a valid // reference block ID. func (suite *BuilderSuite) Payload(transactions ...*flow.TransactionBody) model.Payload { final, err := suite.protoState.Final().Head() - suite.Require().Nil(err) + suite.Require().NoError(err) return model.PayloadFromTransactions(final.ID(), transactions...) } // ProtoStateRoot returns the root block of the protocol state. func (suite *BuilderSuite) ProtoStateRoot() *flow.Header { root, err := suite.protoState.Params().Root() - suite.Require().Nil(err) + suite.Require().NoError(err) return root } @@ -187,7 +200,7 @@ func (suite *BuilderSuite) TestBuildOn_Success() { } header, err := suite.builder.BuildOn(suite.genesis.ID(), setter) - suite.Require().Nil(err) + suite.Require().NoError(err) // setter should have been run suite.Assert().Equal(expectedHeight, header.Height) @@ -195,13 +208,13 @@ func (suite *BuilderSuite) TestBuildOn_Success() { // should be able to retrieve built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Assert().Nil(err) + suite.Assert().NoError(err) builtCollection := built.Payload.Collection // should reference a valid reference block // (since genesis is the only block, it's the only valid reference) mainGenesis, err := suite.protoState.AtHeight(0).Head() - suite.Assert().Nil(err) + suite.Assert().NoError(err) suite.Assert().Equal(mainGenesis.ID(), built.Payload.ReferenceBlockID) // payload should include only items from mempool @@ -210,8 +223,7 @@ func (suite *BuilderSuite) TestBuildOn_Success() { suite.Assert().True(collectionContains(builtCollection, flow.GetIDs(mempoolTransactions)...)) } -// when there are transactions with an unknown reference block in the pool, -// we should not include them in collections +// when there are transactions with an unknown reference block in the pool, we should not include them in collections func (suite *BuilderSuite) TestBuildOn_WithUnknownReferenceBlock() { // before modifying the mempool, note the valid transactions already in the pool @@ -223,21 +235,101 @@ func (suite *BuilderSuite) TestBuildOn_WithUnknownReferenceBlock() { suite.pool.Add(&unknownReferenceTx) header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) // should be able to retrieve built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Assert().Nil(err) + suite.Assert().NoError(err) builtCollection := built.Payload.Collection suite.Assert().Len(builtCollection.Transactions, 3) // payload should include only the transactions with a valid reference block suite.Assert().True(collectionContains(builtCollection, flow.GetIDs(validMempoolTransactions)...)) - // should not contain the the unknown-reference transaction + // should not contain the unknown-reference transaction suite.Assert().False(collectionContains(builtCollection, unknownReferenceTx.ID())) } +// when there are transactions with a known but unfinalized reference block in the pool, we should not include them in collections +func (suite *BuilderSuite) TestBuildOn_WithUnfinalizedReferenceBlock() { + + // before modifying the mempool, note the valid transactions already in the pool + validMempoolTransactions := suite.pool.All() + + // add an unfinalized block to the protocol state + genesis, err := suite.protoState.Final().Head() + suite.Require().NoError(err) + unfinalizedReferenceBlock := unittest.BlockWithParentFixture(genesis) + unfinalizedReferenceBlock.SetPayload(flow.EmptyPayload()) + err = suite.protoState.Extend(context.Background(), unfinalizedReferenceBlock) + suite.Require().NoError(err) + + // add a transaction with unfinalized reference block to the pool + unfinalizedReferenceTx := unittest.TransactionBodyFixture() + unfinalizedReferenceTx.ReferenceBlockID = unfinalizedReferenceBlock.ID() + suite.pool.Add(&unfinalizedReferenceTx) + + header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) + suite.Require().NoError(err) + + // should be able to retrieve built block from storage + var built model.Block + err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) + suite.Assert().NoError(err) + builtCollection := built.Payload.Collection + + suite.Assert().Len(builtCollection.Transactions, 3) + // payload should include only the transactions with a valid reference block + suite.Assert().True(collectionContains(builtCollection, flow.GetIDs(validMempoolTransactions)...)) + // should not contain the unfinalized-reference transaction + suite.Assert().False(collectionContains(builtCollection, unfinalizedReferenceTx.ID())) +} + +// when there are transactions with an orphaned reference block in the pool, we should not include them in collections +func (suite *BuilderSuite) TestBuildOn_WithOrphanedReferenceBlock() { + + // before modifying the mempool, note the valid transactions already in the pool + validMempoolTransactions := suite.pool.All() + + // add an orphaned block to the protocol state + genesis, err := suite.protoState.Final().Head() + suite.Require().NoError(err) + // create a block extending genesis which will be orphaned + orphan := unittest.BlockWithParentFixture(genesis) + orphan.SetPayload(flow.EmptyPayload()) + err = suite.protoState.Extend(context.Background(), orphan) + suite.Require().NoError(err) + // create and finalize a block on top of genesis, orphaning `orphan` + block1 := unittest.BlockWithParentFixture(genesis) + block1.SetPayload(flow.EmptyPayload()) + err = suite.protoState.Extend(context.Background(), block1) + suite.Require().NoError(err) + err = suite.protoState.Finalize(context.Background(), block1.ID()) + suite.Require().NoError(err) + + // add a transaction with orphaned reference block to the pool + orphanedReferenceTx := unittest.TransactionBodyFixture() + orphanedReferenceTx.ReferenceBlockID = orphan.ID() + suite.pool.Add(&orphanedReferenceTx) + + header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) + suite.Require().NoError(err) + + // should be able to retrieve built block from storage + var built model.Block + err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) + suite.Assert().NoError(err) + builtCollection := built.Payload.Collection + + suite.Assert().Len(builtCollection.Transactions, 3) + // payload should include only the transactions with a valid reference block + suite.Assert().True(collectionContains(builtCollection, flow.GetIDs(validMempoolTransactions)...)) + // should not contain the unknown-reference transaction + suite.Assert().False(collectionContains(builtCollection, orphanedReferenceTx.ID())) + // the transaction with orphaned reference should be removed from the mempool + suite.Assert().False(suite.pool.Has(orphanedReferenceTx.ID())) +} + func (suite *BuilderSuite) TestBuildOn_WithForks() { t := suite.T() @@ -264,12 +356,12 @@ func (suite *BuilderSuite) TestBuildOn_WithForks() { // build on top of fork 1 header, err := suite.builder.BuildOn(block1.ID(), noopSetter) - require.Nil(t, err) + require.NoError(t, err) // should be able to retrieve built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - assert.Nil(t, err) + assert.NoError(t, err) builtCollection := built.Payload.Collection // payload should include ONLY tx2 and tx3 @@ -293,27 +385,26 @@ func (suite *BuilderSuite) TestBuildOn_ConflictingFinalizedBlock() { finalizedBlock := unittest.ClusterBlockWithParent(suite.genesis) finalizedBlock.SetPayload(finalizedPayload) suite.InsertBlock(finalizedBlock) - t.Logf("finalized: id=%s\tparent_id=%s\theight=%d\n", finalizedBlock.ID(), finalizedBlock.Header.ParentID, finalizedBlock.Header.Height) + t.Logf("finalized: height=%d id=%s txs=%s parent_id=%s\t\n", finalizedBlock.Header.Height, finalizedBlock.ID(), finalizedPayload.Collection.Light(), finalizedBlock.Header.ParentID) // build a block containing tx2 on the first block unFinalizedPayload := suite.Payload(tx2) unFinalizedBlock := unittest.ClusterBlockWithParent(&finalizedBlock) unFinalizedBlock.SetPayload(unFinalizedPayload) suite.InsertBlock(unFinalizedBlock) - t.Logf("unfinalized: id=%s\tparent_id=%s\theight=%d\n", unFinalizedBlock.ID(), unFinalizedBlock.Header.ParentID, unFinalizedBlock.Header.Height) + t.Logf("finalized: height=%d id=%s txs=%s parent_id=%s\t\n", unFinalizedBlock.Header.Height, unFinalizedBlock.ID(), unFinalizedPayload.Collection.Light(), unFinalizedBlock.Header.ParentID) // finalize first block - err := suite.db.Update(procedure.FinalizeClusterBlock(finalizedBlock.ID())) - assert.Nil(t, err) + suite.FinalizeBlock(finalizedBlock) // build on the un-finalized block header, err := suite.builder.BuildOn(unFinalizedBlock.ID(), noopSetter) - require.Nil(t, err) + require.NoError(t, err) // retrieve the built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - assert.Nil(t, err) + assert.NoError(t, err) builtCollection := built.Payload.Collection // payload should only contain tx3 @@ -353,17 +444,16 @@ func (suite *BuilderSuite) TestBuildOn_ConflictingInvalidatedForks() { t.Logf("invalidated: id=%s\tparent_id=%s\theight=%d\n", invalidatedBlock.ID(), invalidatedBlock.Header.ParentID, invalidatedBlock.Header.Height) // finalize first block - this indirectly invalidates the second block - err := suite.db.Update(procedure.FinalizeClusterBlock(finalizedBlock.ID())) - assert.Nil(t, err) + suite.FinalizeBlock(finalizedBlock) // build on the finalized block header, err := suite.builder.BuildOn(finalizedBlock.ID(), noopSetter) - require.Nil(t, err) + require.NoError(t, err) // retrieve the built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - assert.Nil(t, err) + assert.NoError(t, err) builtCollection := built.Payload.Collection // tx2 and tx3 should be in the built collection @@ -377,11 +467,11 @@ func (suite *BuilderSuite) TestBuildOn_LargeHistory() { // use a mempool with 2000 transactions, one per block suite.pool = herocache.NewTransactions(2000, unittest.Logger(), metrics.NewNoopCollector()) - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionSize(10000)) + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionSize(10000)) // get a valid reference block ID final, err := suite.protoState.Final().Head() - require.Nil(t, err) + require.NoError(t, err) refID := final.ID() // keep track of the head of the chain @@ -390,10 +480,9 @@ func (suite *BuilderSuite) TestBuildOn_LargeHistory() { // keep track of invalidated transaction IDs var invalidatedTxIds []flow.Identifier - // create 1000 blocks containing 1000 transactions - //TODO for now limit this test to no more blocks than we look back by - // when de-duplicating transactions. - for i := 0; i < flow.DefaultTransactionExpiry-1; i++ { + // create a large history of blocks with invalidated forks every 3 blocks on + // average - build until the height exceeds transaction expiry + for i := 0; ; i++ { // create a transaction tx := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { @@ -405,7 +494,7 @@ func (suite *BuilderSuite) TestBuildOn_LargeHistory() { // 1/3 of the time create a conflicting fork that will be invalidated // don't do this the first and last few times to ensure we don't - // try to fork genesis and the the last block is the valid fork. + // try to fork genesis and the last block is the valid fork. conflicting := rand.Intn(3) == 0 && i > 5 && i < 995 // by default, build on the head - if we are building a @@ -413,7 +502,7 @@ func (suite *BuilderSuite) TestBuildOn_LargeHistory() { parent := head if conflicting { err = suite.db.View(procedure.RetrieveClusterBlock(parent.Header.ParentID, &parent)) - assert.Nil(t, err) + assert.NoError(t, err) // add the transaction to the invalidated list invalidatedTxIds = append(invalidatedTxIds, tx.ID()) } @@ -427,21 +516,28 @@ func (suite *BuilderSuite) TestBuildOn_LargeHistory() { // reset the valid head if we aren't building a conflicting fork if !conflicting { head = block - err = suite.db.Update(procedure.FinalizeClusterBlock(block.ID())) - assert.Nil(t, err) + suite.FinalizeBlock(block) + assert.NoError(t, err) + } + + // stop building blocks once we've built a history which exceeds the transaction + // expiry length - this tests that deduplication works properly against old blocks + // which nevertheless have a potentially conflicting reference block + if head.Header.Height > flow.DefaultTransactionExpiry+100 { + break } } t.Log("conflicting: ", len(invalidatedTxIds)) - // build on the the head block + // build on the head block header, err := suite.builder.BuildOn(head.ID(), noopSetter) - require.Nil(t, err) + require.NoError(t, err) // retrieve the built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - require.Nil(t, err) + require.NoError(t, err) builtCollection := built.Payload.Collection // payload should only contain transactions from invalidated blocks @@ -451,16 +547,16 @@ func (suite *BuilderSuite) TestBuildOn_LargeHistory() { func (suite *BuilderSuite) TestBuildOn_MaxCollectionSize() { // set the max collection size to 1 - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionSize(1)) + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionSize(1)) // build a block header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) // retrieve the built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Require().Nil(err) + suite.Require().NoError(err) builtCollection := built.Payload.Collection // should be only 1 transaction in the collection @@ -469,16 +565,16 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSize() { func (suite *BuilderSuite) TestBuildOn_MaxCollectionByteSize() { // set the max collection byte size to 400 (each tx is about 150 bytes) - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionByteSize(400)) + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionByteSize(400)) // build a block header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) // retrieve the built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Require().Nil(err) + suite.Require().NoError(err) builtCollection := built.Payload.Collection // should be only 2 transactions in the collection, since each tx is ~273 bytes and the limit is 600 bytes @@ -487,16 +583,16 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionByteSize() { func (suite *BuilderSuite) TestBuildOn_MaxCollectionTotalGas() { // set the max gas to 20,000 - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionTotalGas(20000)) + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionTotalGas(20000)) // build a block header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) // retrieve the built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Require().Nil(err) + suite.Require().NoError(err) builtCollection := built.Payload.Collection // should be only 2 transactions in collection, since each transaction has gas limit of 9,999 and collection limit is set to 20,000 @@ -507,7 +603,7 @@ func (suite *BuilderSuite) TestBuildOn_ExpiredTransaction() { // create enough main-chain blocks that an expired transaction is possible genesis, err := suite.protoState.Final().Head() - suite.Require().Nil(err) + suite.Require().NoError(err) head := genesis for i := 0; i < flow.DefaultTransactionExpiry+1; i++ { @@ -516,15 +612,15 @@ func (suite *BuilderSuite) TestBuildOn_ExpiredTransaction() { block.Payload.Seals = nil block.Header.PayloadHash = block.Payload.Hash() err = suite.protoState.Extend(context.Background(), block) - suite.Require().Nil(err) + suite.Require().NoError(err) err = suite.protoState.Finalize(context.Background(), block.ID()) - suite.Require().Nil(err) + suite.Require().NoError(err) head = block.Header } // reset the pool and builder suite.pool = herocache.NewTransactions(10, unittest.Logger(), metrics.NewNoopCollector()) - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool) + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool) // insert a transaction referring genesis (now expired) tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { @@ -547,12 +643,12 @@ func (suite *BuilderSuite) TestBuildOn_ExpiredTransaction() { // build a block header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) // retrieve the built block from storage var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Require().Nil(err) + suite.Require().NoError(err) builtCollection := built.Payload.Collection // the block should only contain the un-expired transaction @@ -566,19 +662,19 @@ func (suite *BuilderSuite) TestBuildOn_EmptyMempool() { // start with an empty mempool suite.pool = herocache.NewTransactions(1000, unittest.Logger(), metrics.NewNoopCollector()) - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool) + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool) header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Require().Nil(err) + suite.Require().NoError(err) // should reference a valid reference block // (since genesis is the only block, it's the only valid reference) mainGenesis, err := suite.protoState.AtHeight(0).Head() - suite.Assert().Nil(err) + suite.Assert().NoError(err) suite.Assert().Equal(mainGenesis.ID(), built.Payload.ReferenceBlockID) // the payload should be empty @@ -593,7 +689,7 @@ func (suite *BuilderSuite) TestBuildOn_NoRateLimiting() { suite.ClearPool() // create builder with no rate limit and max 10 tx/collection - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(0), ) @@ -612,13 +708,13 @@ func (suite *BuilderSuite) TestBuildOn_NoRateLimiting() { parentID := suite.genesis.ID() for i := 0; i < 10; i++ { header, err := suite.builder.BuildOn(parentID, noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) parentID = header.ID() // each collection should be full with 10 transactions var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Assert().Nil(err) + suite.Assert().NoError(err) suite.Assert().Len(built.Payload.Collection.Transactions, 10) } } @@ -634,7 +730,7 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitNonPayer() { suite.ClearPool() // create builder with 5 tx/payer and max 10 tx/collection - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(5), ) @@ -659,13 +755,13 @@ func (suite *BuilderSuite) TestBuildOn_RateLimitNonPayer() { parentID := suite.genesis.ID() for i := 0; i < 10; i++ { header, err := suite.builder.BuildOn(parentID, noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) parentID = header.ID() // each collection should be full with 10 transactions var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Assert().Nil(err) + suite.Assert().NoError(err) suite.Assert().Len(built.Payload.Collection.Transactions, 10) } } @@ -678,7 +774,7 @@ func (suite *BuilderSuite) TestBuildOn_HighRateLimit() { suite.ClearPool() // create builder with 5 tx/payer and max 10 tx/collection - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(5), ) @@ -697,13 +793,13 @@ func (suite *BuilderSuite) TestBuildOn_HighRateLimit() { parentID := suite.genesis.ID() for i := 0; i < 10; i++ { header, err := suite.builder.BuildOn(parentID, noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) parentID = header.ID() // each collection should be half-full with 5 transactions var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Assert().Nil(err) + suite.Assert().NoError(err) suite.Assert().Len(built.Payload.Collection.Transactions, 5) } } @@ -716,7 +812,7 @@ func (suite *BuilderSuite) TestBuildOn_LowRateLimit() { suite.ClearPool() // create builder with .5 tx/payer and max 10 tx/collection - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(.5), ) @@ -736,13 +832,13 @@ func (suite *BuilderSuite) TestBuildOn_LowRateLimit() { parentID := suite.genesis.ID() for i := 0; i < 10; i++ { header, err := suite.builder.BuildOn(parentID, noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) parentID = header.ID() // collections should either be empty or have 1 transaction var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Assert().Nil(err) + suite.Assert().NoError(err) if i%2 == 0 { suite.Assert().Len(built.Payload.Collection.Transactions, 1) } else { @@ -758,7 +854,7 @@ func (suite *BuilderSuite) TestBuildOn_UnlimitedPayer() { // create builder with 5 tx/payer and max 10 tx/collection // configure an unlimited payer payer := unittest.RandomAddressFixture() - suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + suite.builder, _ = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionSize(10), builder.WithMaxPayerTransactionRate(5), builder.WithUnlimitedPayers(payer), @@ -777,13 +873,13 @@ func (suite *BuilderSuite) TestBuildOn_UnlimitedPayer() { parentID := suite.genesis.ID() for i := 0; i < 10; i++ { header, err := suite.builder.BuildOn(parentID, noopSetter) - suite.Require().Nil(err) + suite.Require().NoError(err) parentID = header.ID() // each collection should be full with 10 transactions var built model.Block err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) - suite.Assert().Nil(err) + suite.Assert().NoError(err) suite.Assert().Len(built.Payload.Collection.Transactions, 10) } @@ -835,9 +931,9 @@ func benchmarkBuildOn(b *testing.B, size int) { suite.db = unittest.BadgerDB(b, suite.dbdir) defer func() { err = suite.db.Close() - assert.Nil(b, err) + assert.NoError(b, err) err = os.RemoveAll(suite.dbdir) - assert.Nil(b, err) + assert.NoError(b, err) }() metrics := metrics.NewNoopCollector() @@ -850,10 +946,10 @@ func benchmarkBuildOn(b *testing.B, size int) { stateRoot, err := clusterkv.NewStateRoot(suite.genesis) state, err := clusterkv.Bootstrap(suite.db, stateRoot) - assert.Nil(b, err) + assert.NoError(b, err) suite.state, err = clusterkv.NewMutableState(state, tracer, suite.headers, suite.payloads) - assert.Nil(b, err) + assert.NoError(b, err) // add some transactions to transaction pool for i := 0; i < 3; i++ { @@ -863,7 +959,7 @@ func benchmarkBuildOn(b *testing.B, size int) { } // create the builder - suite.builder = builder.NewBuilder(suite.db, tracer, suite.headers, suite.headers, suite.payloads, suite.pool) + suite.builder, _ = builder.NewBuilder(suite.db, tracer, suite.headers, suite.headers, suite.payloads, suite.pool) } // create a block history to test performance against @@ -871,12 +967,12 @@ func benchmarkBuildOn(b *testing.B, size int) { for i := 0; i < size; i++ { block := unittest.ClusterBlockWithParent(final) err := suite.db.Update(procedure.InsertClusterBlock(&block)) - require.Nil(b, err) + require.NoError(b, err) // finalize the block 80% of the time, resulting in a fork-rate of 20% if rand.Intn(100) < 80 { err = suite.db.Update(procedure.FinalizeClusterBlock(block.ID())) - require.Nil(b, err) + require.NoError(b, err) final = &block } } @@ -884,6 +980,6 @@ func benchmarkBuildOn(b *testing.B, size int) { b.StartTimer() for n := 0; n < b.N; n++ { _, err := suite.builder.BuildOn(final.ID(), noopSetter) - assert.Nil(b, err) + assert.NoError(b, err) } } diff --git a/module/builder/collection/tx_lookup.go b/module/builder/collection/tx_lookup.go index b99073acf98..628540eafab 100644 --- a/module/builder/collection/tx_lookup.go +++ b/module/builder/collection/tx_lookup.go @@ -7,7 +7,7 @@ import "github.com/onflow/flow-go/model/flow" type transactionLookup struct { // set of transaction IDs in finalized ancestry finalized map[flow.Identifier]struct{} - // set of transaction IDs in unfinalzied ancestry + // set of transaction IDs in unfinalized ancestry unfinalized map[flow.Identifier]struct{} } diff --git a/module/finalizer/collection/finalizer.go b/module/finalizer/collection/finalizer.go index d96bb781589..5096d0c73e7 100644 --- a/module/finalizer/collection/finalizer.go +++ b/module/finalizer/collection/finalizer.go @@ -82,9 +82,9 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error { return nil } - // in order to validate the validity of all changes, we need to iterate - // through the blocks that need to be finalized from oldest to youngest; - // we thus start at the youngest remember all of the intermediary steps + // To finalize all blocks from the currently finalized one up to and + // including the current, we first enumerate each of these blocks. + // We start at the youngest block and remember all visited blocks, // while tracing back until we reach the finalized state steps := []*flow.Header{&header} parentID := header.ParentID @@ -102,27 +102,27 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error { // each header, we reconstruct the block and then apply the related // changes to the protocol state for i := len(steps) - 1; i >= 0; i-- { + clusterBlockID := steps[i].ID() // look up the transactions included in the payload step := steps[i] var payload cluster.Payload - err = procedure.RetrieveClusterPayload(step.ID(), &payload)(tx) + err = procedure.RetrieveClusterPayload(clusterBlockID, &payload)(tx) if err != nil { - return fmt.Errorf("could not retrieve cluster payload: %w", err) + return fmt.Errorf("could not retrieve payload for cluster block (id=%x): %w", clusterBlockID, err) } // remove the transactions from the memory pool for _, colTx := range payload.Collection.Transactions { txID := colTx.ID() - // ignore result -- we don't care whether the transaction was - // in the pool or not + // ignore result -- we don't care whether the transaction was in the pool _ = f.transactions.Rem(txID) } // finalize the block in cluster state - err = procedure.FinalizeClusterBlock(step.ID())(tx) + err = procedure.FinalizeClusterBlock(clusterBlockID)(tx) if err != nil { - return fmt.Errorf("could not finalize block: %w", err) + return fmt.Errorf("could not finalize cluster block (id=%x): %w", clusterBlockID, err) } block := &cluster.Block{ @@ -131,11 +131,24 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error { } f.metrics.ClusterBlockFinalized(block) - // don't bother submitting empty collections + // if the finalized collection is empty, we don't need to include it + // in the reference height index or submit it to consensus nodes if len(payload.Collection.Transactions) == 0 { continue } + // look up the reference block height to populate index + var refBlock flow.Header + err = operation.RetrieveHeader(payload.ReferenceBlockID, &refBlock)(tx) + if err != nil { + return fmt.Errorf("could not retrieve reference block (id=%x): %w", payload.ReferenceBlockID, err) + } + // index the finalized cluster block by reference block height + err = operation.IndexClusterBlockByReferenceHeight(refBlock.Height, clusterBlockID)(tx) + if err != nil { + return fmt.Errorf("could not index cluster block (id=%x) by reference height (%d): %w", clusterBlockID, refBlock.Height, err) + } + //TODO when we incorporate HotStuff AND require BFT, the consensus // node will need to be able ensure finalization by checking a // 3-chain of children for this block. Probably it will be simplest diff --git a/module/finalizer/collection/finalizer_test.go b/module/finalizer/collection/finalizer_test.go index a2d306a09de..dc435210d9d 100644 --- a/module/finalizer/collection/finalizer_test.go +++ b/module/finalizer/collection/finalizer_test.go @@ -18,6 +18,7 @@ import ( "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network/mocknetwork" cluster "github.com/onflow/flow-go/state/cluster/badger" + "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/procedure" "github.com/onflow/flow-go/utils/unittest" ) @@ -28,6 +29,9 @@ func TestFinalizer(t *testing.T) { // seed the RNG rand.Seed(time.Now().UnixNano()) + // reference block on the main consensus chain + refBlock := unittest.BlockHeaderFixture() + // genesis block for the cluster chain genesis := model.Genesis() metrics := metrics.NewNoopCollector() @@ -53,6 +57,8 @@ func TestFinalizer(t *testing.T) { require.NoError(t, err) state, err = cluster.Bootstrap(db, stateRoot) require.NoError(t, err) + err = db.Update(operation.InsertHeader(refBlock.ID(), &refBlock)) + require.NoError(t, err) } // a helper function to insert a block @@ -88,7 +94,7 @@ func TestFinalizer(t *testing.T) { // create a new block on genesis block := unittest.ClusterBlockWithParent(genesis) - block.SetPayload(model.PayloadFromTransactions(flow.ZeroID, &tx1)) + block.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx1)) insert(block) // finalize the block @@ -111,6 +117,7 @@ func TestFinalizer(t *testing.T) { // create a new block that isn't connected to a parent block := unittest.ClusterBlockWithParent(genesis) block.Header.ParentID = unittest.IdentifierFixture() + block.SetPayload(model.EmptyPayload(refBlock.ID())) insert(block) // try to finalize - this should fail @@ -127,7 +134,7 @@ func TestFinalizer(t *testing.T) { // create a block with empty payload on genesis block := unittest.ClusterBlockWithParent(genesis) - block.SetPayload(model.EmptyPayload(flow.ZeroID)) + block.SetPayload(model.EmptyPayload(refBlock.ID())) insert(block) // finalize the block @@ -160,7 +167,7 @@ func TestFinalizer(t *testing.T) { // create a block containing tx1 on top of genesis block := unittest.ClusterBlockWithParent(genesis) - block.SetPayload(model.PayloadFromTransactions(flow.ZeroID, &tx1)) + block.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx1)) insert(block) // finalize the block @@ -176,20 +183,21 @@ func TestFinalizer(t *testing.T) { final, err := state.Final().Head() assert.Nil(t, err) assert.Equal(t, block.ID(), final.ID()) + assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, final.ID()) // block should be passed to provider prov.AssertNumberOfCalls(t, "SubmitLocal", 1) prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ Guarantee: flow.CollectionGuarantee{ - CollectionID: block.Payload.Collection.ID(), - SignerIDs: block.Header.ParentVoterIDs, - Signature: block.Header.ParentVoterSigData, + CollectionID: block.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + SignerIDs: block.Header.ParentVoterIDs, + Signature: block.Header.ParentVoterSigData, }, }) }) - // when finalizing a block with un-finalized ancestors, those ancestors - // should be finalized as well + // when finalizing a block with un-finalized ancestors, those ancestors should be finalized as well t.Run("finalize multiple blocks together", func(t *testing.T) { bootstrap() defer cleanup() @@ -207,12 +215,12 @@ func TestFinalizer(t *testing.T) { // create a block containing tx1 on top of genesis block1 := unittest.ClusterBlockWithParent(genesis) - block1.SetPayload(model.PayloadFromTransactions(flow.ZeroID, &tx1)) + block1.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx1)) insert(block1) // create a block containing tx2 on top of block1 block2 := unittest.ClusterBlockWithParent(&block1) - block2.SetPayload(model.PayloadFromTransactions(flow.ZeroID, &tx2)) + block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) // finalize block2 (should indirectly finalize block1 as well) @@ -227,21 +235,24 @@ func TestFinalizer(t *testing.T) { final, err := state.Final().Head() assert.Nil(t, err) assert.Equal(t, block2.ID(), final.ID()) + assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID(), block2.ID()) // both blocks should be passed to provider prov.AssertNumberOfCalls(t, "SubmitLocal", 2) prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ Guarantee: flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - SignerIDs: block1.Header.ParentVoterIDs, - Signature: block1.Header.ParentVoterSigData, + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + SignerIDs: block1.Header.ParentVoterIDs, + Signature: block1.Header.ParentVoterSigData, }, }) prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ Guarantee: flow.CollectionGuarantee{ - CollectionID: block2.Payload.Collection.ID(), - SignerIDs: block2.Header.ParentVoterIDs, - Signature: block2.Header.ParentVoterSigData, + CollectionID: block2.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + SignerIDs: block2.Header.ParentVoterIDs, + Signature: block2.Header.ParentVoterSigData, }, }) }) @@ -263,12 +274,12 @@ func TestFinalizer(t *testing.T) { // create a block containing tx1 on top of genesis block1 := unittest.ClusterBlockWithParent(genesis) - block1.SetPayload(model.PayloadFromTransactions(flow.ZeroID, &tx1)) + block1.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx1)) insert(block1) // create a block containing tx2 on top of block1 block2 := unittest.ClusterBlockWithParent(&block1) - block2.SetPayload(model.PayloadFromTransactions(flow.ZeroID, &tx2)) + block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) // finalize block1 (should NOT finalize block2) @@ -284,20 +295,21 @@ func TestFinalizer(t *testing.T) { final, err := state.Final().Head() assert.Nil(t, err) assert.Equal(t, block1.ID(), final.ID()) + assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID()) // block should be passed to provider prov.AssertNumberOfCalls(t, "SubmitLocal", 1) prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ Guarantee: flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - SignerIDs: block1.Header.ParentVoterIDs, - Signature: block1.Header.ParentVoterSigData, + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + SignerIDs: block1.Header.ParentVoterIDs, + Signature: block1.Header.ParentVoterSigData, }, }) }) - // when finalizing a block with a conflicting fork, the fork should - // not be finalized. + // when finalizing a block with a conflicting fork, the fork should not be finalized. t.Run("conflicting fork", func(t *testing.T) { bootstrap() defer cleanup() @@ -315,15 +327,15 @@ func TestFinalizer(t *testing.T) { // create a block containing tx1 on top of genesis block1 := unittest.ClusterBlockWithParent(genesis) - block1.SetPayload(model.PayloadFromTransactions(flow.ZeroID, &tx1)) + block1.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx1)) insert(block1) // create a block containing tx2 on top of genesis (conflicting with block1) block2 := unittest.ClusterBlockWithParent(genesis) - block2.SetPayload(model.PayloadFromTransactions(flow.ZeroID, &tx2)) + block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2)) insert(block2) - // finalize block2 + // finalize block1 err := finalizer.MakeFinal(block1.ID()) assert.Nil(t, err) @@ -336,16 +348,28 @@ func TestFinalizer(t *testing.T) { final, err := state.Final().Head() assert.Nil(t, err) assert.Equal(t, block1.ID(), final.ID()) + assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID()) // block should be passed to provider prov.AssertNumberOfCalls(t, "SubmitLocal", 1) prov.AssertCalled(t, "SubmitLocal", &messages.SubmitCollectionGuarantee{ Guarantee: flow.CollectionGuarantee{ - CollectionID: block1.Payload.Collection.ID(), - SignerIDs: block1.Header.ParentVoterIDs, - Signature: block1.Header.ParentVoterSigData, + CollectionID: block1.Payload.Collection.ID(), + ReferenceBlockID: refBlock.ID(), + SignerIDs: block1.Header.ParentVoterIDs, + Signature: block1.Header.ParentVoterSigData, }, }) }) }) } + +// assertClusterBlocksIndexedByReferenceHeight checks the given cluster blocks have +// been indexed by the given reference block height, which is expected as part of +// finalization. +func assertClusterBlocksIndexedByReferenceHeight(t *testing.T, db *badger.DB, refHeight uint64, clusterBlockIDs ...flow.Identifier) { + var ids []flow.Identifier + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(refHeight, refHeight, &ids)) + require.NoError(t, err) + assert.ElementsMatch(t, clusterBlockIDs, ids) +} diff --git a/state/cluster/badger/mutator.go b/state/cluster/badger/mutator.go index 46fa121dbb1..8d8909305b8 100644 --- a/state/cluster/badger/mutator.go +++ b/state/cluster/badger/mutator.go @@ -13,6 +13,7 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/trace" "github.com/onflow/flow-go/state" + "github.com/onflow/flow-go/state/fork" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/badger/operation" "github.com/onflow/flow-go/storage/badger/procedure" @@ -64,11 +65,16 @@ func (m *MutableState) Extend(block *cluster.Block) error { // get the chain ID, which determines which cluster state to query chainID := header.ChainID - // get the latest finalized block - var final flow.Header - err := procedure.RetrieveLatestFinalizedClusterHeader(chainID, &final)(tx) + // get the latest finalized cluster block and latest finalized consensus height + var finalizedClusterBlock flow.Header + err := procedure.RetrieveLatestFinalizedClusterHeader(chainID, &finalizedClusterBlock)(tx) if err != nil { - return fmt.Errorf("could not retrieve finalized head: %w", err) + return fmt.Errorf("could not retrieve finalized cluster head: %w", err) + } + var finalizedConsensusHeight uint64 + err = operation.RetrieveFinalizedHeight(&finalizedConsensusHeight)(tx) + if err != nil { + return fmt.Errorf("could not retrieve finalized height on consensus chain: %w", err) } // get the header of the parent of the new block @@ -92,7 +98,7 @@ func (m *MutableState) Extend(block *cluster.Block) error { // start with the extending block's parent parentID := header.ParentID - for parentID != final.ID() { + for parentID != finalizedClusterBlock.ID() { // get the parent of current block ancestor, err := m.headers.ByBlockID(parentID) @@ -102,9 +108,9 @@ func (m *MutableState) Extend(block *cluster.Block) error { // if its height is below current boundary, the block does not connect // to the finalized protocol state and would break database consistency - if ancestor.Height < final.Height { - return state.NewOutdatedExtensionErrorf("block doesn't connect to finalized state. ancestor.Height (%v), final.Height (%v)", - ancestor.Height, final.Height) + if ancestor.Height < finalizedClusterBlock.Height { + return state.NewOutdatedExtensionErrorf("block doesn't connect to finalized state. ancestor.Height (%d), final.Height (%d)", + ancestor.Height, finalizedClusterBlock.Height) } parentID = ancestor.ParentID @@ -117,6 +123,7 @@ func (m *MutableState) Extend(block *cluster.Block) error { // check that all transactions within the collection are valid minRefID := flow.ZeroID minRefHeight := uint64(math.MaxUint64) + maxRefHeight := uint64(0) for _, flowTx := range payload.Collection.Transactions { refBlock, err := m.headers.ByBlockID(flowTx.ReferenceBlockID) if errors.Is(err, storage.ErrNotFound) { @@ -131,6 +138,9 @@ func (m *MutableState) Extend(block *cluster.Block) error { minRefHeight = refBlock.Height minRefID = flowTx.ReferenceBlockID } + if refBlock.Height > maxRefHeight { + maxRefHeight = refBlock.Height + } } // a valid collection must reference the oldest reference block among @@ -141,6 +151,14 @@ func (m *MutableState) Extend(block *cluster.Block) error { payload.ReferenceBlockID, minRefID, ) } + // a valid collection must contain only transactions within its expiry window + if payload.Collection.Len() > 0 { + if maxRefHeight-minRefHeight >= flow.DefaultTransactionExpiry { + return state.NewInvalidExtensionErrorf( + "collection contains reference height range [%d,%d] exceeding expiry window size: %d", + minRefHeight, maxRefHeight, flow.DefaultTransactionExpiry) + } + } // a valid collection must reference a valid reference block // NOTE: it is valid for a collection to be expired at this point, @@ -156,50 +174,32 @@ func (m *MutableState) Extend(block *cluster.Block) error { // TODO ensure the reference block is part of the main chain _ = refBlock - // we go back a fixed number of blocks to check payload for now - // TODO look back based on reference block ID and expiry https://github.com/dapperlabs/flow-go/issues/3556 - limit := block.Header.Height - flow.DefaultTransactionExpiry - if limit > block.Header.Height { // overflow check - limit = 0 - } - // check for duplicate transactions in block's ancestry txLookup := make(map[flow.Identifier]struct{}) for _, tx := range block.Payload.Collection.Transactions { - txLookup[tx.ID()] = struct{}{} - } - - var duplicateTxIDs flow.IdentifierList - ancestorID := block.Header.ParentID - for { - ancestor, err := m.headers.ByBlockID(ancestorID) - if err != nil { - return fmt.Errorf("could not retrieve ancestor header: %w", err) - } - - if ancestor.Height <= limit { - break - } - - payload, err := m.payloads.ByBlockID(ancestorID) - if err != nil { - return fmt.Errorf("could not retrieve ancestor payload: %w", err) + txID := tx.ID() + if _, exists := txLookup[txID]; exists { + return state.NewInvalidExtensionErrorf("collection contains transaction (id=%x) more than once", txID) } + txLookup[txID] = struct{}{} + } - for _, tx := range payload.Collection.Transactions { - txID := tx.ID() - _, duplicated := txLookup[txID] - if duplicated { - duplicateTxIDs = append(duplicateTxIDs, txID) - } - } - ancestorID = ancestor.ParentID + // first, check for duplicate transactions in the un-finalized ancestry + duplicateTxIDs, err := m.checkDupeTransactionsInUnfinalizedAncestry(block, txLookup, finalizedClusterBlock.Height) + if err != nil { + return fmt.Errorf("could not check for duplicate txs in un-finalized ancestry: %w", err) + } + if len(duplicateTxIDs) > 0 { + return state.NewInvalidExtensionErrorf("payload includes duplicate transactions in un-finalized ancestry (duplicates: %s)", duplicateTxIDs) } - // if we have duplicate transactions, fail + // second, check for duplicate transactions in the finalized ancestry + duplicateTxIDs, err = m.checkDupeTransactionsInFinalizedAncestry(txLookup, minRefHeight, maxRefHeight) + if err != nil { + return fmt.Errorf("could not check for duplicate txs in finalized ancestry: %w", err) + } if len(duplicateTxIDs) > 0 { - return state.NewInvalidExtensionErrorf("payload includes duplicate transactions (duplicates: %s)", - duplicateTxIDs) + return state.NewInvalidExtensionErrorf("payload includes duplicate transactions in finalized ancestry (duplicates: %s)", duplicateTxIDs) } return nil @@ -218,3 +218,81 @@ func (m *MutableState) Extend(block *cluster.Block) error { } return nil } + +// checkDupeTransactionsInUnfinalizedAncestry checks for duplicate transactions in the un-finalized +// ancestry of the given block, and returns a list of all duplicates if there are any. +func (m *MutableState) checkDupeTransactionsInUnfinalizedAncestry(block *cluster.Block, includedTransactions map[flow.Identifier]struct{}, finalHeight uint64) ([]flow.Identifier, error) { + + var duplicateTxIDs []flow.Identifier + err := fork.TraverseBackward(m.headers, block.Header.ParentID, func(ancestor *flow.Header) error { + payload, err := m.payloads.ByBlockID(ancestor.ID()) + if err != nil { + return fmt.Errorf("could not retrieve ancestor payload: %w", err) + } + + for _, tx := range payload.Collection.Transactions { + txID := tx.ID() + _, duplicated := includedTransactions[txID] + if duplicated { + duplicateTxIDs = append(duplicateTxIDs, txID) + } + } + return nil + }, fork.ExcludingHeight(finalHeight)) + + return duplicateTxIDs, err +} + +// checkDupeTransactionsInFinalizedAncestry checks for duplicate transactions in the finalized +// ancestry, and returns a list of all duplicates if there are any. +func (m *MutableState) checkDupeTransactionsInFinalizedAncestry(includedTransactions map[flow.Identifier]struct{}, minRefHeight, maxRefHeight uint64) ([]flow.Identifier, error) { + var duplicatedTxIDs []flow.Identifier + + // Let E be the global transaction expiry constant, measured in blocks. For each + // T ∈ `includedTransactions`, we have to decide whether the transaction + // already appeared in _any_ finalized cluster block. + // Notation: + // - consider a valid cluster block C and let c be its reference block height + // - consider a transaction T ∈ `includedTransactions` and let t denote its + // reference block height + // + // Boundary conditions: + // 1. C's reference block height is equal to the lowest reference block height of + // all its constituent transactions. Hence, for collection C to potentially contain T, it must satisfy c <= t. + // 2. For T to be eligible for inclusion in collection C, _none_ of the transactions within C are allowed + // to be expired w.r.t. C's reference block. Hence, for collection C to potentially contain T, it must satisfy t < c + E. + // + // Therefore, for collection C to potentially contain transaction T, it must satisfy t - E < c <= t. + // In other words, we only need to inspect collections with reference block height c ∈ (t-E, t]. + // Consequently, for a set of transactions, with `minRefHeight` (`maxRefHeight`) being the smallest (largest) + // reference block height, we only need to inspect collections with c ∈ (minRefHeight-E, maxRefHeight]. + + // the finalized cluster blocks which could possibly contain any conflicting transactions + var clusterBlockIDs []flow.Identifier + start := minRefHeight - flow.DefaultTransactionExpiry + 1 + if start > minRefHeight { + start = 0 // overflow check + } + end := maxRefHeight + err := m.db.View(operation.LookupClusterBlocksByReferenceHeightRange(start, end, &clusterBlockIDs)) + if err != nil { + return nil, fmt.Errorf("could not lookup finalized cluster blocks by reference height range [%d,%d]: %w", start, end, err) + } + + for _, blockID := range clusterBlockIDs { + // TODO: could add LightByBlockID and retrieve only tx IDs + payload, err := m.payloads.ByBlockID(blockID) + if err != nil { + return nil, fmt.Errorf("could not retrieve cluster payload (block_id=%x) to de-duplicate: %w", blockID, err) + } + for _, tx := range payload.Collection.Transactions { + txID := tx.ID() + _, duplicated := includedTransactions[txID] + if duplicated { + duplicatedTxIDs = append(duplicatedTxIDs, txID) + } + } + } + + return duplicatedTxIDs, nil +} diff --git a/state/cluster/badger/mutator_test.go b/state/cluster/badger/mutator_test.go index 1de0cbff7e0..467c8b1cc1c 100644 --- a/state/cluster/badger/mutator_test.go +++ b/state/cluster/badger/mutator_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/dgraph-io/badger/v2" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -133,6 +134,23 @@ func (suite *MutatorSuite) Block() model.Block { return suite.BlockWithParent(suite.genesis) } +func (suite *MutatorSuite) FinalizeBlock(block model.Block) { + err := suite.db.Update(func(tx *badger.Txn) error { + var refBlock flow.Header + err := operation.RetrieveHeader(block.Payload.ReferenceBlockID, &refBlock)(tx) + if err != nil { + return err + } + err = procedure.FinalizeClusterBlock(block.ID())(tx) + if err != nil { + return err + } + err = operation.IndexClusterBlockByReferenceHeight(refBlock.Height, block.ID())(tx) + return err + }) + suite.Assert().NoError(err) +} + func (suite *MutatorSuite) Tx(opts ...func(*flow.TransactionBody)) flow.TransactionBody { final, err := suite.protoState.Final().Head() suite.Require().Nil(err) @@ -237,6 +255,18 @@ func (suite *MutatorSuite) TestExtend_InvalidBlockNumber() { suite.Assert().Error(err) } +func (suite *MutatorSuite) TestExtend_DuplicateTxInPayload() { + block := suite.Block() + // add the same transaction to a payload twice + tx := suite.Tx() + payload := suite.Payload(&tx, &tx) + block.SetPayload(payload) + + // should fail to extend block with invalid payload + err := suite.state.Extend(&block) + suite.Assert().Error(err) +} + func (suite *MutatorSuite) TestExtend_OnParentOfFinalized() { // build one block on top of genesis block1 := suite.Block() @@ -244,8 +274,7 @@ func (suite *MutatorSuite) TestExtend_OnParentOfFinalized() { suite.Assert().Nil(err) // finalize the block - err = suite.db.Update(procedure.FinalizeClusterBlock(block1.ID())) - suite.Assert().Nil(err) + suite.FinalizeBlock(block1) // insert another block on top of genesis // since we have already finalized block 1, this is invalid @@ -362,7 +391,7 @@ func (suite *MutatorSuite) TestExtend_FinalizedBlockWithDupeTx() { suite.Assert().Nil(err) // should be able to finalize block 1 - err = suite.db.Update(procedure.FinalizeClusterBlock(block1.ID())) + suite.FinalizeBlock(block1) suite.Assert().Nil(err) // create a block building on block1 ALSO containing tx1 @@ -397,3 +426,87 @@ func (suite *MutatorSuite) TestExtend_ConflictingForkWithDupeTx() { err = suite.state.Extend(&block2) suite.Assert().Nil(err) } + +func (suite *MutatorSuite) TestExtend_LargeHistory() { + t := suite.T() + + // get a valid reference block ID + final, err := suite.protoState.Final().Head() + require.NoError(t, err) + refID := final.ID() + + // keep track of the head of the chain + head := *suite.genesis + + // keep track of transactions in orphaned forks (eligible for inclusion in future block) + var invalidatedTransactions []*flow.TransactionBody + // keep track of the oldest transactions (further back in ancestry than the expiry window) + var oldTransactions []*flow.TransactionBody + + // create a large history of blocks with invalidated forks every 3 blocks on + // average - build until the height exceeds transaction expiry + for i := 0; ; i++ { + + // create a transaction + tx := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { + tx.ReferenceBlockID = refID + tx.ProposalKey.SequenceNumber = uint64(i) + }) + + // 1/3 of the time create a conflicting fork that will be invalidated + // don't do this the first and last few times to ensure we don't + // try to fork genesis and the last block is the valid fork. + conflicting := rand.Intn(3) == 0 && i > 5 && i < 995 + + // by default, build on the head - if we are building a + // conflicting fork, build on the parent of the head + parent := head + if conflicting { + err = suite.db.View(procedure.RetrieveClusterBlock(parent.Header.ParentID, &parent)) + assert.NoError(t, err) + // add the transaction to the invalidated list + invalidatedTransactions = append(invalidatedTransactions, &tx) + } else if head.Header.Height < 50 { + oldTransactions = append(oldTransactions, &tx) + } + + // create a block containing the transaction + block := unittest.ClusterBlockWithParent(&head) + payload := suite.Payload(&tx) + block.SetPayload(payload) + err = suite.state.Extend(&block) + assert.NoError(t, err) + + // reset the valid head if we aren't building a conflicting fork + if !conflicting { + head = block + suite.FinalizeBlock(block) + assert.NoError(t, err) + } + + // stop building blocks once we've built a history which exceeds the transaction + // expiry length - this tests that deduplication works properly against old blocks + // which nevertheless have a potentially conflicting reference block + if head.Header.Height > flow.DefaultTransactionExpiry+100 { + break + } + } + + t.Log("conflicting: ", len(invalidatedTransactions)) + + t.Run("should be able to extend with transactions in orphaned forks", func(t *testing.T) { + block := unittest.ClusterBlockWithParent(&head) + payload := suite.Payload(invalidatedTransactions...) + block.SetPayload(payload) + err = suite.state.Extend(&block) + assert.NoError(t, err) + }) + + t.Run("should be unable to extend with conflicting transactions within reference height range of extending block", func(t *testing.T) { + block := unittest.ClusterBlockWithParent(&head) + payload := suite.Payload(oldTransactions...) + block.SetPayload(payload) + err = suite.state.Extend(&block) + assert.Error(t, err) + }) +} diff --git a/storage/badger/headers.go b/storage/badger/headers.go index 4742004ea0e..b5c85ce5657 100644 --- a/storage/badger/headers.go +++ b/storage/badger/headers.go @@ -138,6 +138,17 @@ func (h *Headers) ByHeight(height uint64) (*flow.Header, error) { return h.retrieveTx(blockID)(tx) } +func (h *Headers) BlockIDByHeight(height uint64) (flow.Identifier, error) { + tx := h.db.NewTransaction(false) + defer tx.Discard() + + blockID, err := h.retrieveIdByHeightTx(height)(tx) + if err != nil { + return flow.ZeroID, fmt.Errorf("could not lookup block id by height %d: %w", height, err) + } + return blockID, nil +} + func (h *Headers) ByParentID(parentID flow.Identifier) ([]*flow.Header, error) { var blockIDs []flow.Identifier err := h.db.View(procedure.LookupBlockChildren(parentID, &blockIDs)) diff --git a/storage/badger/operation/cluster.go b/storage/badger/operation/cluster.go index 9ec9c93d53a..fdf80d30db2 100644 --- a/storage/badger/operation/cluster.go +++ b/storage/badger/operation/cluster.go @@ -37,14 +37,46 @@ func RetrieveClusterFinalizedHeight(clusterID flow.ChainID, number *uint64) func return retrieve(makePrefix(codeClusterHeight, clusterID), number) } -// IndexCollectionReference inserts the reference block ID for a cluster -// block payload (ie. collection) keyed by the cluster block ID -func IndexCollectionReference(clusterBlockID, refID flow.Identifier) func(*badger.Txn) error { - return insert(makePrefix(codeCollectionReference, clusterBlockID), refID) +// IndexReferenceBlockByClusterBlock inserts the reference block ID for the given +// cluster block ID. While each cluster block specifies a reference block in its +// payload, we maintain this additional lookup for performance reasons. +func IndexReferenceBlockByClusterBlock(clusterBlockID, refID flow.Identifier) func(*badger.Txn) error { + return insert(makePrefix(codeClusterBlockToRefBlock, clusterBlockID), refID) } -// LookupCollectionReference looks up the reference block ID for a cluster -// block payload (ie. collection) keyed by the cluster block ID. -func LookupCollectionReference(clusterBlockID flow.Identifier, refID *flow.Identifier) func(*badger.Txn) error { - return retrieve(makePrefix(codeCollectionReference, clusterBlockID), refID) +// LookupReferenceBlockByClusterBlock looks up the reference block ID for the given +// cluster block ID. While each cluster block specifies a reference block in its +// payload, we maintain this additional lookup for performance reasons. +func LookupReferenceBlockByClusterBlock(clusterBlockID flow.Identifier, refID *flow.Identifier) func(*badger.Txn) error { + return retrieve(makePrefix(codeClusterBlockToRefBlock, clusterBlockID), refID) +} + +// IndexClusterBlockByReferenceHeight indexes a cluster block ID by its reference +// block height. The cluster block ID is included in the key for more efficient +// traversal. Only finalized cluster blocks should be included in this index. +// The key looks like: +func IndexClusterBlockByReferenceHeight(refHeight uint64, clusterBlockID flow.Identifier) func(*badger.Txn) error { + return insert(makePrefix(codeRefHeightToClusterBlock, refHeight, clusterBlockID), nil) +} + +// LookupClusterBlocksByReferenceHeightRange traverses the ref_height->cluster_block +// index and returns any finalized cluster blocks which have a reference block with +// height in the given range. This is used to avoid including duplicate transaction +// when building or validating a new collection. +func LookupClusterBlocksByReferenceHeightRange(start, end uint64, clusterBlockIDs *[]flow.Identifier) func(*badger.Txn) error { + startPrefix := makePrefix(codeRefHeightToClusterBlock, start) + endPrefix := makePrefix(codeRefHeightToClusterBlock, end) + + return iterate(startPrefix, endPrefix, func() (checkFunc, createFunc, handleFunc) { + check := func(key []byte) bool { + clusterBlockIDBytes := key[9:] + var clusterBlockID flow.Identifier + copy(clusterBlockID[:], clusterBlockIDBytes) + *clusterBlockIDs = append(*clusterBlockIDs, clusterBlockID) + + // the info we need is stored in the key, never process the value + return false + } + return check, nil, nil + }, withPrefetchValuesFalse) } diff --git a/storage/badger/operation/cluster_test.go b/storage/badger/operation/cluster_test.go index efb96a99552..9a616e08490 100644 --- a/storage/badger/operation/cluster_test.go +++ b/storage/badger/operation/cluster_test.go @@ -3,10 +3,12 @@ package operation_test import ( "errors" "fmt" + "math/rand" "testing" "github.com/dgraph-io/badger/v2" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/storage" @@ -109,3 +111,203 @@ func TestClusterBoundaries(t *testing.T) { }) }) } + +func TestClusterBlockByReferenceHeight(t *testing.T) { + + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + t.Run("should be able to index cluster block by reference height", func(t *testing.T) { + id := unittest.IdentifierFixture() + height := rand.Uint64() + err := db.Update(operation.IndexClusterBlockByReferenceHeight(height, id)) + assert.NoError(t, err) + + var retrieved []flow.Identifier + err = db.View(operation.LookupClusterBlocksByReferenceHeightRange(height, height, &retrieved)) + assert.NoError(t, err) + require.Len(t, retrieved, 1) + assert.Equal(t, id, retrieved[0]) + }) + }) + + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + t.Run("should be able to index multiple cluster blocks at same reference height", func(t *testing.T) { + ids := unittest.IdentifierListFixture(10) + height := rand.Uint64() + for _, id := range ids { + err := db.Update(operation.IndexClusterBlockByReferenceHeight(height, id)) + assert.NoError(t, err) + } + + var retrieved []flow.Identifier + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(height, height, &retrieved)) + assert.NoError(t, err) + assert.Len(t, retrieved, len(ids)) + assert.ElementsMatch(t, ids, retrieved) + }) + }) + + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + t.Run("should be able to lookup cluster blocks across height range", func(t *testing.T) { + ids := unittest.IdentifierListFixture(100) + nextHeight := rand.Uint64() + // keep track of height range + minHeight, maxHeight := nextHeight, nextHeight + // keep track of which ids are indexed at each nextHeight + lookup := make(map[uint64][]flow.Identifier) + + for i := 0; i < len(ids); i++ { + // randomly adjust the nextHeight, increasing on average + r := rand.Intn(100) + if r < 20 { + nextHeight -= 1 // 20% + } else if r < 40 { + // nextHeight stays the same - 20% + } else if r < 80 { + nextHeight += 1 // 40% + } else { + nextHeight += 2 // 20% + } + + lookup[nextHeight] = append(lookup[nextHeight], ids[i]) + if nextHeight < minHeight { + minHeight = nextHeight + } + if nextHeight > maxHeight { + maxHeight = nextHeight + } + + err := db.Update(operation.IndexClusterBlockByReferenceHeight(nextHeight, ids[i])) + assert.NoError(t, err) + } + + // determine which ids we expect to be retrieved for a given height range + idsInHeightRange := func(min, max uint64) []flow.Identifier { + var idsForHeight []flow.Identifier + for height, id := range lookup { + if min <= height && height <= max { + idsForHeight = append(idsForHeight, id...) + } + } + return idsForHeight + } + + // Test cases are described as follows: + // {---} represents the queried height range + // [---] represents the indexed height range + // [{ means the left endpoint of both ranges are the same + // {-[ means the left endpoint of the queried range is strictly less than the indexed range + t.Run("{-}--[-]", func(t *testing.T) { + var retrieved []flow.Identifier + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(minHeight-100, minHeight-1, &retrieved)) + assert.NoError(t, err) + assert.Len(t, retrieved, 0) + }) + t.Run("{-[--}-]", func(t *testing.T) { + var retrieved []flow.Identifier + min := minHeight - 100 + max := minHeight + (maxHeight-minHeight)/2 + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(min, max, &retrieved)) + assert.NoError(t, err) + + expected := idsInHeightRange(min, max) + assert.NotEmpty(t, expected, "test assumption broken") + assert.Len(t, retrieved, len(expected)) + assert.ElementsMatch(t, expected, retrieved) + }) + t.Run("{[--}--]", func(t *testing.T) { + var retrieved []flow.Identifier + min := minHeight + max := minHeight + (maxHeight-minHeight)/2 + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(min, max, &retrieved)) + assert.NoError(t, err) + + expected := idsInHeightRange(min, max) + assert.NotEmpty(t, expected, "test assumption broken") + assert.Len(t, retrieved, len(expected)) + assert.ElementsMatch(t, expected, retrieved) + }) + t.Run("[-{--}-]", func(t *testing.T) { + var retrieved []flow.Identifier + min := minHeight + 1 + max := maxHeight - 1 + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(min, max, &retrieved)) + assert.NoError(t, err) + + expected := idsInHeightRange(min, max) + assert.NotEmpty(t, expected, "test assumption broken") + assert.Len(t, retrieved, len(expected)) + assert.ElementsMatch(t, expected, retrieved) + }) + t.Run("[{----}]", func(t *testing.T) { + var retrieved []flow.Identifier + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(minHeight, maxHeight, &retrieved)) + assert.NoError(t, err) + + expected := idsInHeightRange(minHeight, maxHeight) + assert.NotEmpty(t, expected, "test assumption broken") + assert.Len(t, retrieved, len(expected)) + assert.ElementsMatch(t, expected, retrieved) + }) + t.Run("[--{--}]", func(t *testing.T) { + var retrieved []flow.Identifier + min := minHeight + (maxHeight-minHeight)/2 + max := maxHeight + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(min, max, &retrieved)) + assert.NoError(t, err) + + expected := idsInHeightRange(min, max) + assert.NotEmpty(t, expected, "test assumption broken") + assert.Len(t, retrieved, len(expected)) + assert.ElementsMatch(t, expected, retrieved) + }) + t.Run("[-{--]-}", func(t *testing.T) { + var retrieved []flow.Identifier + min := minHeight + (maxHeight-minHeight)/2 + max := maxHeight + 100 + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(min, max, &retrieved)) + assert.NoError(t, err) + + expected := idsInHeightRange(min, max) + assert.NotEmpty(t, expected, "test assumption broken") + assert.Len(t, retrieved, len(expected)) + assert.ElementsMatch(t, expected, retrieved) + }) + t.Run("[-]--{-}", func(t *testing.T) { + var retrieved []flow.Identifier + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(maxHeight+1, maxHeight+100, &retrieved)) + assert.NoError(t, err) + assert.Len(t, retrieved, 0) + }) + }) + }) +} + +// expected average case # of blocks to lookup on Mainnet +func BenchmarkLookupClusterBlocksByReferenceHeightRange_1200(b *testing.B) { + benchmarkLookupClusterBlocksByReferenceHeightRange(b, 1200) +} + +// 5x average case on Mainnet +func BenchmarkLookupClusterBlocksByReferenceHeightRange_6_000(b *testing.B) { + benchmarkLookupClusterBlocksByReferenceHeightRange(b, 6_000) +} + +func BenchmarkLookupClusterBlocksByReferenceHeightRange_100_000(b *testing.B) { + benchmarkLookupClusterBlocksByReferenceHeightRange(b, 100_000) +} + +func benchmarkLookupClusterBlocksByReferenceHeightRange(b *testing.B, n int) { + unittest.RunWithBadgerDB(b, func(db *badger.DB) { + for i := 0; i < n; i++ { + err := db.Update(operation.IndexClusterBlockByReferenceHeight(rand.Uint64()%1000, unittest.IdentifierFixture())) + require.NoError(b, err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + var blockIDs []flow.Identifier + err := db.View(operation.LookupClusterBlocksByReferenceHeightRange(0, 1000, &blockIDs)) + require.NoError(b, err) + } + }) +} diff --git a/storage/badger/operation/common.go b/storage/badger/operation/common.go index 9c2893b17c8..ce536635f8f 100644 --- a/storage/badger/operation/common.go +++ b/storage/badger/operation/common.go @@ -226,6 +226,14 @@ func lookup(entityIDs *[]flow.Identifier) func() (checkFunc, createFunc, handleF } } +// withPrefetchValuesFalse configures a Badger iteration to NOT preemptively load +// the values when iterating over keys (ie. key-only iteration). Key-only iteration +// is several order of magnitudes faster than regular iteration, because it involves +// access to the LSM-tree only, which is usually resident entirely in RAM. +func withPrefetchValuesFalse(options *badger.IteratorOptions) { + options.PrefetchValues = false +} + // iterate iterates over a range of keys defined by a start and end key. The // start key may be higher than the end key, in which case we iterate in // reverse order. @@ -241,12 +249,15 @@ func lookup(entityIDs *[]flow.Identifier) func() (checkFunc, createFunc, handleF // // TODO: this function is unbounded – pass context.Context to this or calling // functions to allow timing functions out. -func iterate(start []byte, end []byte, iteration iterationFunc) func(*badger.Txn) error { +func iterate(start []byte, end []byte, iteration iterationFunc, opts ...func(*badger.IteratorOptions)) func(*badger.Txn) error { return func(tx *badger.Txn) error { // initialize the default options and comparison modifier for iteration modifier := 1 options := badger.DefaultIteratorOptions + for _, apply := range opts { + apply(&options) + } // In order to satisfy this function's prefix-wise inclusion semantics, // we append 0xff bytes to the largest of start and end. diff --git a/storage/badger/operation/prefix.go b/storage/badger/operation/prefix.go index 68756a79bb4..f4031509bb1 100644 --- a/storage/badger/operation/prefix.go +++ b/storage/badger/operation/prefix.go @@ -44,11 +44,12 @@ const ( codeResultApproval = 37 codeChunk = 38 - // codes for indexing single identifier by identifier - codeHeightToBlock = 40 // index mapping height to block ID - codeBlockToSeal = 41 // index mapping a block its last payload seal - codeCollectionReference = 42 // index reference block ID for collection - codeBlockValidity = 43 // validity of block per HotStuff + // codes for indexing single identifier by identifier/integeter + codeHeightToBlock = 40 // index mapping height to block ID + codeBlockToSeal = 41 // index mapping a block its last payload seal + codeClusterBlockToRefBlock = 42 // index cluster block ID to reference block ID + codeBlockValidity = 43 // validity of block per HotStuff + codeRefHeightToClusterBlock = 44 // index reference block height to cluster block IDs // codes for indexing multiple identifiers by identifier // NOTE: 51 was used for identity indexes before epochs diff --git a/storage/badger/procedure/cluster.go b/storage/badger/procedure/cluster.go index aae609c9577..f51c8597938 100644 --- a/storage/badger/procedure/cluster.go +++ b/storage/badger/procedure/cluster.go @@ -180,7 +180,7 @@ func InsertClusterPayload(blockID flow.Identifier, payload *cluster.Payload) fun } // insert the reference block ID - err = operation.IndexCollectionReference(blockID, payload.ReferenceBlockID)(tx) + err = operation.IndexReferenceBlockByClusterBlock(blockID, payload.ReferenceBlockID)(tx) if err != nil { return fmt.Errorf("could not insert reference block ID: %w", err) } @@ -195,7 +195,7 @@ func RetrieveClusterPayload(blockID flow.Identifier, payload *cluster.Payload) f // lookup the reference block ID var refID flow.Identifier - err := operation.LookupCollectionReference(blockID, &refID)(tx) + err := operation.LookupReferenceBlockByClusterBlock(blockID, &refID)(tx) if err != nil { return fmt.Errorf("could not retrieve reference block ID: %w", err) } diff --git a/storage/headers.go b/storage/headers.go index d96cf5724e2..f4273482ed2 100644 --- a/storage/headers.go +++ b/storage/headers.go @@ -20,17 +20,17 @@ type Headers interface { // for finalized blocks. ByHeight(height uint64) (*flow.Header, error) - // Find all children for the given parent block. The returned headers might - // be unfinalized; if there is more than one, at least one of them has to + // ByParentID finds all children for the given parent block. The returned headers + // might be unfinalized; if there is more than one, at least one of them has to // be unfinalized. ByParentID(parentID flow.Identifier) ([]*flow.Header, error) - // Indexes block ID by chunk ID + // IndexByChunkID indexes block ID by chunk ID. IndexByChunkID(headerID, chunkID flow.Identifier) error - // Indexes block ID by chunk ID in a given batch + // BatchIndexByChunkID indexes block ID by chunk ID in a given batch. BatchIndexByChunkID(headerID, chunkID flow.Identifier, batch BatchStorage) error - // Finds the ID of the block corresponding to given chunk ID + // IDByChunkID finds the ID of the block corresponding to given chunk ID. IDByChunkID(chunkID flow.Identifier) (flow.Identifier, error) } diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index a6bd0819200..d41ef67fb09 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -408,15 +408,21 @@ func CidFixture() cid.Cid { return blocks.NewBlock(data).Cid() } -func BlockHeaderFixtureOnChain(chainID flow.ChainID) flow.Header { - height := uint64(rand.Uint32()) +func BlockHeaderFixtureOnChain(chainID flow.ChainID, opts ...func(header *flow.Header)) flow.Header { + height := 1 + uint64(rand.Uint32()) // avoiding edge case of height = 0 (genesis block) view := height + uint64(rand.Intn(1000)) - return BlockHeaderWithParentFixture(&flow.Header{ + header := BlockHeaderWithParentFixture(&flow.Header{ ChainID: chainID, ParentID: IdentifierFixture(), Height: height, View: view, }) + + for _, opt := range opts { + opt(&header) + } + + return header } func BlockHeaderWithParentFixture(parent *flow.Header) flow.Header { @@ -603,10 +609,11 @@ func ExecutableBlockFixtureWithParent(collectionsSignerIDs [][]flow.Identifier, return executableBlock } -func ExecutableBlockFromTransactions(txss [][]*flow.TransactionBody) *entity.ExecutableBlock { +func ExecutableBlockFromTransactions(chain flow.ChainID, txss [][]*flow.TransactionBody) *entity.ExecutableBlock { completeCollections := make(map[flow.Identifier]*entity.CompleteCollection, len(txss)) - block := BlockFixture() + blockHeader := BlockHeaderFixtureOnChain(chain) + block := *BlockWithParentFixture(&blockHeader) block.Payload.Guarantees = nil for _, txs := range txss {