diff --git a/build/ci.go b/build/ci.go index 9df9ff939d..fb8ab4be91 100644 --- a/build/ci.go +++ b/build/ci.go @@ -923,11 +923,12 @@ func gomobileTool(subcmd string, args ...string) *exec.Cmd { "PATH=" + GOBIN + string(os.PathListSeparator) + os.Getenv("PATH"), } for _, e := range os.Environ() { - if strings.HasPrefix(e, "GOPATH=") || strings.HasPrefix(e, "PATH=") { + if strings.HasPrefix(e, "GOPATH=") || strings.HasPrefix(e, "PATH=") || strings.HasPrefix(e, "GOBIN=") { continue } cmd.Env = append(cmd.Env, e) } + cmd.Env = append(cmd.Env, "GOBIN="+GOBIN) return cmd } @@ -999,7 +1000,7 @@ func doXCodeFramework(cmdline []string) { if *local { // If we're building locally, use the build folder and stop afterwards - bind.Dir, _ = filepath.Abs(GOBIN) + bind.Dir = GOBIN build.MustRun(bind) return } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index c0a8eb761b..3927bc53ae 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -37,6 +37,7 @@ import ( mockEngine "github.com/celo-org/celo-blockchain/consensus/consensustest" "github.com/celo-org/celo-blockchain/consensus/istanbul" "github.com/celo-org/celo-blockchain/core" + "github.com/celo-org/celo-blockchain/core/rawdb" "github.com/celo-org/celo-blockchain/core/vm" "github.com/celo-org/celo-blockchain/crypto" "github.com/celo-org/celo-blockchain/eth" @@ -1694,24 +1695,44 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { } // Create new developer account or reuse existing one var ( - developer accounts.Account - err error + developer accounts.Account + passphrase string + err error ) - if accs := ks.Accounts(); len(accs) > 0 { + if list := MakePasswordList(ctx); len(list) > 0 { + // Just take the first value. Although the function returns a possible multiple values and + // some usages iterate through them as attempts, that doesn't make sense in this setting, + // when we're definitely concerned with only one account. + passphrase = list[0] + } + // setValidator has been called above, configuring the miner address from command line flags. + if cfg.Miner.Validator != (common.Address{}) { + developer = accounts.Account{Address: cfg.Miner.Validator} + } else if accs := ks.Accounts(); len(accs) > 0 { developer = ks.Accounts()[0] } else { key, _ := crypto.HexToECDSA("add67e37fdf5c26743d295b1af6d9b50f2785a6b60bc83a8f05bd1dd4b385c6c") - developer, err = ks.ImportECDSA(key, "") + developer, err = ks.ImportECDSA(key, passphrase) if err != nil { Fatalf("Failed to create developer account: %v", err) } } - if err := ks.Unlock(developer, ""); err != nil { + if err := ks.Unlock(developer, passphrase); err != nil { Fatalf("Failed to unlock developer account: %v", err) } log.Info("Using developer account", "address", developer.Address) + // Create a new developer genesis block or reuse existing one cfg.Genesis = core.DeveloperGenesisBlock(uint64(ctx.GlobalInt(DeveloperPeriodFlag.Name)), developer.Address) + if ctx.GlobalIsSet(DataDirFlag.Name) { + // Check if we have an already initialized chain and fall back to + // that if so. Otherwise we need to generate a new genesis spec. + chaindb := MakeChainDatabase(ctx, stack) + if rawdb.ReadCanonicalHash(chaindb, 0) != (common.Hash{}) { + cfg.Genesis = nil // fallback to db content + } + chaindb.Close() + } default: if cfg.NetworkId == params.MainnetNetworkId { setDNSDiscoveryDefaults(cfg, params.MainnetGenesisHash) diff --git a/core/tx_pool.go b/core/tx_pool.go index e9d6670d8f..50ebf4bd50 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -275,7 +275,7 @@ type TxPool struct { queue map[common.Address]*txList // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account all *txLookup // All transactions to allow lookups - priced *txPricedList // All transactions sorted by price. One heap per fee currency. + priced *txPricedList // All transactions sorted by price. One heap per fee currency. chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription @@ -772,6 +772,10 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er if _, exist := pool.beats[from]; !exist { pool.beats[from] = time.Now() } + // If we never record the heartbeat, do it right now. + if _, exist := pool.beats[from]; !exist { + pool.beats[from] = time.Now() + } return old != nil, nil } diff --git a/core/types/block.go b/core/types/block.go index cb26320c94..4727af659e 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -99,6 +99,17 @@ func (h *Header) SanityCheck() error { return nil } +// EmptyBody returns true if there is no additional 'body' to complete the header +// that is: no transactions. +func (h *Header) EmptyBody() bool { + return h.TxHash == EmptyRootHash +} + +// EmptyReceipts returns true if there are no receipts for this header/block. +func (h *Header) EmptyReceipts() bool { + return h.ReceiptHash == EmptyRootHash +} + // hasherPool holds LegacyKeccak hashers. var hasherPool = sync.Pool{ New: func() interface{} { diff --git a/core/types/gen_log_json.go b/core/types/gen_log_json.go index fb684992c6..ae1692a32f 100644 --- a/core/types/gen_log_json.go +++ b/core/types/gen_log_json.go @@ -20,9 +20,9 @@ func (l Log) MarshalJSON() ([]byte, error) { Data hexutil.Bytes `json:"data" gencodec:"required"` BlockNumber hexutil.Uint64 `json:"blockNumber"` TxHash common.Hash `json:"transactionHash" gencodec:"required"` - TxIndex hexutil.Uint `json:"transactionIndex" gencodec:"required"` + TxIndex hexutil.Uint `json:"transactionIndex"` BlockHash common.Hash `json:"blockHash"` - Index hexutil.Uint `json:"logIndex" gencodec:"required"` + Index hexutil.Uint `json:"logIndex"` Removed bool `json:"removed"` } var enc Log @@ -46,9 +46,9 @@ func (l *Log) UnmarshalJSON(input []byte) error { Data *hexutil.Bytes `json:"data" gencodec:"required"` BlockNumber *hexutil.Uint64 `json:"blockNumber"` TxHash *common.Hash `json:"transactionHash" gencodec:"required"` - TxIndex *hexutil.Uint `json:"transactionIndex" gencodec:"required"` + TxIndex *hexutil.Uint `json:"transactionIndex"` BlockHash *common.Hash `json:"blockHash"` - Index *hexutil.Uint `json:"logIndex" gencodec:"required"` + Index *hexutil.Uint `json:"logIndex"` Removed *bool `json:"removed"` } var dec Log @@ -74,17 +74,15 @@ func (l *Log) UnmarshalJSON(input []byte) error { return errors.New("missing required field 'transactionHash' for Log") } l.TxHash = *dec.TxHash - if dec.TxIndex == nil { - return errors.New("missing required field 'transactionIndex' for Log") + if dec.TxIndex != nil { + l.TxIndex = uint(*dec.TxIndex) } - l.TxIndex = uint(*dec.TxIndex) if dec.BlockHash != nil { l.BlockHash = *dec.BlockHash } - if dec.Index == nil { - return errors.New("missing required field 'logIndex' for Log") + if dec.Index != nil { + l.Index = uint(*dec.Index) } - l.Index = uint(*dec.Index) if dec.Removed != nil { l.Removed = *dec.Removed } diff --git a/core/types/log.go b/core/types/log.go index fcd25d2368..ae5f8356f1 100644 --- a/core/types/log.go +++ b/core/types/log.go @@ -44,11 +44,11 @@ type Log struct { // hash of the transaction TxHash common.Hash `json:"transactionHash" gencodec:"required"` // index of the transaction in the block - TxIndex uint `json:"transactionIndex" gencodec:"required"` + TxIndex uint `json:"transactionIndex"` // hash of the block in which the transaction was included BlockHash common.Hash `json:"blockHash"` // index of the log in the block - Index uint `json:"logIndex" gencodec:"required"` + Index uint `json:"logIndex"` // The Removed field is true if this log was reverted due to a chain reorganisation. // You must pay attention to this field if you receive logs through a filter query. diff --git a/core/vm/eips.go b/core/vm/eips.go index d501e00d1a..34cf1ecef6 100644 --- a/core/vm/eips.go +++ b/core/vm/eips.go @@ -70,12 +70,11 @@ func enable1884(jt *JumpTable) { // jt[EXTCODEHASH].constantGas = params.ExtcodeHashGasEIP1884 // New opcode - jt[SELFBALANCE] = operation{ + jt[SELFBALANCE] = &operation{ execute: opSelfBalance, constantGas: GasFastStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, } } @@ -89,12 +88,11 @@ func opSelfBalance(pc *uint64, interpreter *EVMInterpreter, callContext *callCtx // - Adds an opcode that returns the current chain’s EIP-155 unique identifier func enable1344(jt *JumpTable) { // New opcode - jt[CHAINID] = operation{ + jt[CHAINID] = &operation{ execute: opChainID, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, } } @@ -119,29 +117,26 @@ func enable2200(jt *JumpTable) { // - Adds opcodes that jump to and return from subroutines func enable2315(jt *JumpTable) { // New opcode - jt[BEGINSUB] = operation{ + jt[BEGINSUB] = &operation{ execute: opBeginSub, constantGas: GasQuickStep, minStack: minStack(0, 0), maxStack: maxStack(0, 0), - valid: true, } // New opcode - jt[JUMPSUB] = operation{ + jt[JUMPSUB] = &operation{ execute: opJumpSub, constantGas: GasSlowStep, minStack: minStack(1, 0), maxStack: maxStack(1, 0), jumps: true, - valid: true, } // New opcode - jt[RETURNSUB] = operation{ + jt[RETURNSUB] = &operation{ execute: opReturnSub, constantGas: GasFastStep, minStack: minStack(0, 0), maxStack: maxStack(0, 0), - valid: true, jumps: true, } } diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index 080c317034..1be993ca86 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -32,7 +32,7 @@ type Config struct { NoRecursion bool // Disables call, callcode, delegate call and create EnablePreimageRecording bool // Enables recording of SHA3/keccak preimages - JumpTable [256]operation // EVM instruction table, automatically populated if unset + JumpTable [256]*operation // EVM instruction table, automatically populated if unset EWASMInterpreter string // External EWASM interpreter options EVMInterpreter string // External EVM interpreter options @@ -96,7 +96,7 @@ func NewEVMInterpreter(evm *EVM, cfg *Config) *EVMInterpreter { // We use the STOP instruction whether to see // the jump table was initialised. If it was not // we'll set the default jump table. - if !cfg.JumpTable[STOP].valid { + if cfg.JumpTable[STOP] == nil { var jt JumpTable switch { case evm.chainRules.IsIstanbul: @@ -219,7 +219,7 @@ func (in *EVMInterpreter) Run(contract *Contract, input []byte, readOnly bool) ( // enough stack items available to perform the operation. op = contract.GetOp(pc) operation := in.cfg.JumpTable[op] - if !operation.valid { + if operation == nil { return nil, &ErrInvalidOpCode{opcode: op} } // Validate stack diff --git a/core/vm/jump_table.go b/core/vm/jump_table.go index ec4fd1a831..ffc7a60697 100644 --- a/core/vm/jump_table.go +++ b/core/vm/jump_table.go @@ -44,7 +44,6 @@ type operation struct { halts bool // indicates whether the operation should halt further execution jumps bool // indicates whether the program counter should not increment writes bool // determines whether this a state modifying operation - valid bool // indication whether the retrieved operation is valid and known reverts bool // determines whether the operation reverts state (implicitly halts) returns bool // determines whether the operations sets the return data content } @@ -60,7 +59,7 @@ var ( ) // JumpTable contains the EVM opcodes supported at a given fork. -type JumpTable [256]operation +type JumpTable [256]*operation // newIstanbulInstructionSet returns the frontier, homestead // byzantium, contantinople and petersburg instructions. @@ -78,42 +77,37 @@ func newIstanbulInstructionSet() JumpTable { // byzantium and contantinople instructions. func newConstantinopleInstructionSet() JumpTable { instructionSet := newByzantiumInstructionSet() - instructionSet[SHL] = operation{ + instructionSet[SHL] = &operation{ execute: opSHL, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, } - instructionSet[SHR] = operation{ + instructionSet[SHR] = &operation{ execute: opSHR, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, } - instructionSet[SAR] = operation{ + instructionSet[SAR] = &operation{ execute: opSAR, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, } - instructionSet[EXTCODEHASH] = operation{ + instructionSet[EXTCODEHASH] = &operation{ execute: opExtCodeHash, constantGas: params.ExtcodeHashGasConstantinople, minStack: minStack(1, 1), maxStack: maxStack(1, 1), - valid: true, } - instructionSet[CREATE2] = operation{ + instructionSet[CREATE2] = &operation{ execute: opCreate2, constantGas: params.Create2Gas, dynamicGas: gasCreate2, minStack: minStack(4, 1), maxStack: maxStack(4, 1), memorySize: memoryCreate2, - valid: true, writes: true, returns: true, } @@ -124,39 +118,35 @@ func newConstantinopleInstructionSet() JumpTable { // byzantium instructions. func newByzantiumInstructionSet() JumpTable { instructionSet := newSpuriousDragonInstructionSet() - instructionSet[STATICCALL] = operation{ + instructionSet[STATICCALL] = &operation{ execute: opStaticCall, constantGas: params.CallGasEIP150, dynamicGas: gasStaticCall, minStack: minStack(6, 1), maxStack: maxStack(6, 1), memorySize: memoryStaticCall, - valid: true, returns: true, } - instructionSet[RETURNDATASIZE] = operation{ + instructionSet[RETURNDATASIZE] = &operation{ execute: opReturnDataSize, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, } - instructionSet[RETURNDATACOPY] = operation{ + instructionSet[RETURNDATACOPY] = &operation{ execute: opReturnDataCopy, constantGas: GasFastestStep, dynamicGas: gasReturnDataCopy, minStack: minStack(3, 0), maxStack: maxStack(3, 0), memorySize: memoryReturnDataCopy, - valid: true, } - instructionSet[REVERT] = operation{ + instructionSet[REVERT] = &operation{ execute: opRevert, dynamicGas: gasRevert, minStack: minStack(2, 0), maxStack: maxStack(2, 0), memorySize: memoryRevert, - valid: true, reverts: true, returns: true, } @@ -188,14 +178,13 @@ func newTangerineWhistleInstructionSet() JumpTable { // instructions that can be executed during the homestead phase. func newHomesteadInstructionSet() JumpTable { instructionSet := newFrontierInstructionSet() - instructionSet[DELEGATECALL] = operation{ + instructionSet[DELEGATECALL] = &operation{ execute: opDelegateCall, dynamicGas: gasDelegateCall, constantGas: params.CallGasFrontier, minStack: minStack(6, 1), maxStack: maxStack(6, 1), memorySize: memoryDelegateCall, - valid: true, returns: true, } return instructionSet @@ -211,161 +200,138 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(0, 0), maxStack: maxStack(0, 0), halts: true, - valid: true, }, ADD: { execute: opAdd, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, MUL: { execute: opMul, constantGas: GasFastStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, SUB: { execute: opSub, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, DIV: { execute: opDiv, constantGas: GasFastStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, SDIV: { execute: opSdiv, constantGas: GasFastStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, MOD: { execute: opMod, constantGas: GasFastStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, SMOD: { execute: opSmod, constantGas: GasFastStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, ADDMOD: { execute: opAddmod, constantGas: GasMidStep, minStack: minStack(3, 1), maxStack: maxStack(3, 1), - valid: true, }, MULMOD: { execute: opMulmod, constantGas: GasMidStep, minStack: minStack(3, 1), maxStack: maxStack(3, 1), - valid: true, }, EXP: { execute: opExp, dynamicGas: gasExpFrontier, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, SIGNEXTEND: { execute: opSignExtend, constantGas: GasFastStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, LT: { execute: opLt, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, GT: { execute: opGt, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, SLT: { execute: opSlt, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, SGT: { execute: opSgt, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, EQ: { execute: opEq, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, ISZERO: { execute: opIszero, constantGas: GasFastestStep, minStack: minStack(1, 1), maxStack: maxStack(1, 1), - valid: true, }, AND: { execute: opAnd, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, XOR: { execute: opXor, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, OR: { execute: opOr, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, NOT: { execute: opNot, constantGas: GasFastestStep, minStack: minStack(1, 1), maxStack: maxStack(1, 1), - valid: true, }, BYTE: { execute: opByte, constantGas: GasFastestStep, minStack: minStack(2, 1), maxStack: maxStack(2, 1), - valid: true, }, SHA3: { execute: opSha3, @@ -374,56 +340,48 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(2, 1), maxStack: maxStack(2, 1), memorySize: memorySha3, - valid: true, }, ADDRESS: { execute: opAddress, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, BALANCE: { execute: opBalance, constantGas: params.BalanceGasFrontier, minStack: minStack(1, 1), maxStack: maxStack(1, 1), - valid: true, }, ORIGIN: { execute: opOrigin, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, CALLER: { execute: opCaller, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, CALLVALUE: { execute: opCallValue, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, CALLDATALOAD: { execute: opCallDataLoad, constantGas: GasFastestStep, minStack: minStack(1, 1), maxStack: maxStack(1, 1), - valid: true, }, CALLDATASIZE: { execute: opCallDataSize, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, CALLDATACOPY: { execute: opCallDataCopy, @@ -432,14 +390,12 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(3, 0), maxStack: maxStack(3, 0), memorySize: memoryCallDataCopy, - valid: true, }, CODESIZE: { execute: opCodeSize, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, CODECOPY: { execute: opCodeCopy, @@ -448,21 +404,18 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(3, 0), maxStack: maxStack(3, 0), memorySize: memoryCodeCopy, - valid: true, }, GASPRICE: { execute: opGasprice, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, EXTCODESIZE: { execute: opExtCodeSize, constantGas: params.ExtcodeSizeGasFrontier, minStack: minStack(1, 1), maxStack: maxStack(1, 1), - valid: true, }, EXTCODECOPY: { execute: opExtCodeCopy, @@ -471,42 +424,36 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(4, 0), maxStack: maxStack(4, 0), memorySize: memoryExtCodeCopy, - valid: true, }, BLOCKHASH: { execute: opBlockhash, constantGas: GasExtStep, minStack: minStack(1, 1), maxStack: maxStack(1, 1), - valid: true, }, COINBASE: { execute: opCoinbase, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, TIMESTAMP: { execute: opTimestamp, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, NUMBER: { execute: opNumber, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, POP: { execute: opPop, constantGas: GasQuickStep, minStack: minStack(1, 0), maxStack: maxStack(1, 0), - valid: true, }, MLOAD: { execute: opMload, @@ -515,7 +462,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(1, 1), maxStack: maxStack(1, 1), memorySize: memoryMLoad, - valid: true, }, MSTORE: { execute: opMstore, @@ -524,7 +470,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(2, 0), maxStack: maxStack(2, 0), memorySize: memoryMStore, - valid: true, }, MSTORE8: { execute: opMstore8, @@ -533,22 +478,18 @@ func newFrontierInstructionSet() JumpTable { memorySize: memoryMStore8, minStack: minStack(2, 0), maxStack: maxStack(2, 0), - - valid: true, }, SLOAD: { execute: opSload, constantGas: params.SloadGasFrontier, minStack: minStack(1, 1), maxStack: maxStack(1, 1), - valid: true, }, SSTORE: { execute: opSstore, dynamicGas: gasSStore, minStack: minStack(2, 0), maxStack: maxStack(2, 0), - valid: true, writes: true, }, JUMP: { @@ -557,7 +498,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(1, 0), maxStack: maxStack(1, 0), jumps: true, - valid: true, }, JUMPI: { execute: opJumpi, @@ -565,483 +505,414 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(2, 0), maxStack: maxStack(2, 0), jumps: true, - valid: true, }, PC: { execute: opPc, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, MSIZE: { execute: opMsize, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, GAS: { execute: opGas, constantGas: GasQuickStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, JUMPDEST: { execute: opJumpdest, constantGas: params.JumpdestGas, minStack: minStack(0, 0), maxStack: maxStack(0, 0), - valid: true, }, PUSH1: { execute: opPush1, constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH2: { execute: makePush(2, 2), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH3: { execute: makePush(3, 3), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH4: { execute: makePush(4, 4), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH5: { execute: makePush(5, 5), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH6: { execute: makePush(6, 6), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH7: { execute: makePush(7, 7), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH8: { execute: makePush(8, 8), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH9: { execute: makePush(9, 9), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH10: { execute: makePush(10, 10), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH11: { execute: makePush(11, 11), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH12: { execute: makePush(12, 12), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH13: { execute: makePush(13, 13), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH14: { execute: makePush(14, 14), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH15: { execute: makePush(15, 15), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH16: { execute: makePush(16, 16), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH17: { execute: makePush(17, 17), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH18: { execute: makePush(18, 18), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH19: { execute: makePush(19, 19), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH20: { execute: makePush(20, 20), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH21: { execute: makePush(21, 21), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH22: { execute: makePush(22, 22), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH23: { execute: makePush(23, 23), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH24: { execute: makePush(24, 24), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH25: { execute: makePush(25, 25), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH26: { execute: makePush(26, 26), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH27: { execute: makePush(27, 27), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH28: { execute: makePush(28, 28), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH29: { execute: makePush(29, 29), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH30: { execute: makePush(30, 30), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH31: { execute: makePush(31, 31), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, PUSH32: { execute: makePush(32, 32), constantGas: GasFastestStep, minStack: minStack(0, 1), maxStack: maxStack(0, 1), - valid: true, }, DUP1: { execute: makeDup(1), constantGas: GasFastestStep, minStack: minDupStack(1), maxStack: maxDupStack(1), - valid: true, }, DUP2: { execute: makeDup(2), constantGas: GasFastestStep, minStack: minDupStack(2), maxStack: maxDupStack(2), - valid: true, }, DUP3: { execute: makeDup(3), constantGas: GasFastestStep, minStack: minDupStack(3), maxStack: maxDupStack(3), - valid: true, }, DUP4: { execute: makeDup(4), constantGas: GasFastestStep, minStack: minDupStack(4), maxStack: maxDupStack(4), - valid: true, }, DUP5: { execute: makeDup(5), constantGas: GasFastestStep, minStack: minDupStack(5), maxStack: maxDupStack(5), - valid: true, }, DUP6: { execute: makeDup(6), constantGas: GasFastestStep, minStack: minDupStack(6), maxStack: maxDupStack(6), - valid: true, }, DUP7: { execute: makeDup(7), constantGas: GasFastestStep, minStack: minDupStack(7), maxStack: maxDupStack(7), - valid: true, }, DUP8: { execute: makeDup(8), constantGas: GasFastestStep, minStack: minDupStack(8), maxStack: maxDupStack(8), - valid: true, }, DUP9: { execute: makeDup(9), constantGas: GasFastestStep, minStack: minDupStack(9), maxStack: maxDupStack(9), - valid: true, }, DUP10: { execute: makeDup(10), constantGas: GasFastestStep, minStack: minDupStack(10), maxStack: maxDupStack(10), - valid: true, }, DUP11: { execute: makeDup(11), constantGas: GasFastestStep, minStack: minDupStack(11), maxStack: maxDupStack(11), - valid: true, }, DUP12: { execute: makeDup(12), constantGas: GasFastestStep, minStack: minDupStack(12), maxStack: maxDupStack(12), - valid: true, }, DUP13: { execute: makeDup(13), constantGas: GasFastestStep, minStack: minDupStack(13), maxStack: maxDupStack(13), - valid: true, }, DUP14: { execute: makeDup(14), constantGas: GasFastestStep, minStack: minDupStack(14), maxStack: maxDupStack(14), - valid: true, }, DUP15: { execute: makeDup(15), constantGas: GasFastestStep, minStack: minDupStack(15), maxStack: maxDupStack(15), - valid: true, }, DUP16: { execute: makeDup(16), constantGas: GasFastestStep, minStack: minDupStack(16), maxStack: maxDupStack(16), - valid: true, }, SWAP1: { execute: makeSwap(1), constantGas: GasFastestStep, minStack: minSwapStack(2), maxStack: maxSwapStack(2), - valid: true, }, SWAP2: { execute: makeSwap(2), constantGas: GasFastestStep, minStack: minSwapStack(3), maxStack: maxSwapStack(3), - valid: true, }, SWAP3: { execute: makeSwap(3), constantGas: GasFastestStep, minStack: minSwapStack(4), maxStack: maxSwapStack(4), - valid: true, }, SWAP4: { execute: makeSwap(4), constantGas: GasFastestStep, minStack: minSwapStack(5), maxStack: maxSwapStack(5), - valid: true, }, SWAP5: { execute: makeSwap(5), constantGas: GasFastestStep, minStack: minSwapStack(6), maxStack: maxSwapStack(6), - valid: true, }, SWAP6: { execute: makeSwap(6), constantGas: GasFastestStep, minStack: minSwapStack(7), maxStack: maxSwapStack(7), - valid: true, }, SWAP7: { execute: makeSwap(7), constantGas: GasFastestStep, minStack: minSwapStack(8), maxStack: maxSwapStack(8), - valid: true, }, SWAP8: { execute: makeSwap(8), constantGas: GasFastestStep, minStack: minSwapStack(9), maxStack: maxSwapStack(9), - valid: true, }, SWAP9: { execute: makeSwap(9), constantGas: GasFastestStep, minStack: minSwapStack(10), maxStack: maxSwapStack(10), - valid: true, }, SWAP10: { execute: makeSwap(10), constantGas: GasFastestStep, minStack: minSwapStack(11), maxStack: maxSwapStack(11), - valid: true, }, SWAP11: { execute: makeSwap(11), constantGas: GasFastestStep, minStack: minSwapStack(12), maxStack: maxSwapStack(12), - valid: true, }, SWAP12: { execute: makeSwap(12), constantGas: GasFastestStep, minStack: minSwapStack(13), maxStack: maxSwapStack(13), - valid: true, }, SWAP13: { execute: makeSwap(13), constantGas: GasFastestStep, minStack: minSwapStack(14), maxStack: maxSwapStack(14), - valid: true, }, SWAP14: { execute: makeSwap(14), constantGas: GasFastestStep, minStack: minSwapStack(15), maxStack: maxSwapStack(15), - valid: true, }, SWAP15: { execute: makeSwap(15), constantGas: GasFastestStep, minStack: minSwapStack(16), maxStack: maxSwapStack(16), - valid: true, }, SWAP16: { execute: makeSwap(16), constantGas: GasFastestStep, minStack: minSwapStack(17), maxStack: maxSwapStack(17), - valid: true, }, LOG0: { execute: makeLog(0), @@ -1049,7 +920,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(2, 0), maxStack: maxStack(2, 0), memorySize: memoryLog, - valid: true, writes: true, }, LOG1: { @@ -1058,7 +928,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(3, 0), maxStack: maxStack(3, 0), memorySize: memoryLog, - valid: true, writes: true, }, LOG2: { @@ -1067,7 +936,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(4, 0), maxStack: maxStack(4, 0), memorySize: memoryLog, - valid: true, writes: true, }, LOG3: { @@ -1076,7 +944,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(5, 0), maxStack: maxStack(5, 0), memorySize: memoryLog, - valid: true, writes: true, }, LOG4: { @@ -1085,7 +952,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(6, 0), maxStack: maxStack(6, 0), memorySize: memoryLog, - valid: true, writes: true, }, CREATE: { @@ -1095,7 +961,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(3, 1), maxStack: maxStack(3, 1), memorySize: memoryCreate, - valid: true, writes: true, returns: true, }, @@ -1106,7 +971,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(7, 1), maxStack: maxStack(7, 1), memorySize: memoryCall, - valid: true, returns: true, }, CALLCODE: { @@ -1116,7 +980,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(7, 1), maxStack: maxStack(7, 1), memorySize: memoryCall, - valid: true, returns: true, }, RETURN: { @@ -1126,7 +989,6 @@ func newFrontierInstructionSet() JumpTable { maxStack: maxStack(2, 0), memorySize: memoryReturn, halts: true, - valid: true, }, SELFDESTRUCT: { execute: opSuicide, @@ -1134,7 +996,6 @@ func newFrontierInstructionSet() JumpTable { minStack: minStack(1, 0), maxStack: maxStack(1, 0), halts: true, - valid: true, writes: true, }, } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 01fcdab482..2bfdbed348 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -246,7 +246,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, stateBloom: stateBloom, mux: mux, checkpoint: checkpoint, - queue: newQueue(), + queue: newQueue(blockCacheItems), peers: newPeerSet(), rttEstimate: uint64(rttDefaultEstimate), rttConfidence: uint64(1000000), @@ -400,7 +400,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode d.stateBloom.Close() } // Reset the queue, peer set and wake channels to clean any internal leftover state - d.queue.Reset() + d.queue.Reset(blockCacheItems) d.peers.Reset() for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { @@ -626,6 +626,9 @@ func (d *Downloader) Terminate() { default: close(d.quitCh) } + if d.stateBloom != nil { + d.stateBloom.Close() + } d.quitLock.Unlock() // Cancel any pending download requests @@ -658,7 +661,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { // Make sure the peer actually gave something valid headers := packet.(*headerPack).headers if len(headers) != 1 { - p.log.Debug("Multiple headers for single request", "headers", len(headers)) + p.log.Warn("Multiple headers for single request", "headers", len(headers)) return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) } head := headers[0] @@ -898,7 +901,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) // Make sure the peer actually gave something valid headers := packer.(*headerPack).headers if len(headers) != 1 { - p.log.Debug("Multiple headers for single request", "headers", len(headers)) + p.log.Warn("Multiple headers for single request", "headers", len(headers)) return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) } arrived = true @@ -922,7 +925,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) } header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists if header.Number.Uint64() != check { - p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) + p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number) } start = check @@ -1228,17 +1231,18 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( pack := packet.(*headerPack) return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh) } - expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } - throttle = func() bool { return false } - reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) { - return d.queue.ReserveHeaders(p, count), false, nil + expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } + reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) { + return d.queue.ReserveHeaders(p, count), false, false } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } - setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) } + setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { + p.SetHeadersIdle(accepted, deliveryTime) + } ) err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire, - d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, + d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve, nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers") log.Debug("Skeleton fill terminated", "err", err) @@ -1261,10 +1265,10 @@ func (d *Downloader) fetchBodies(from uint64) error { expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) } capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } - setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } + setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) } ) err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire, - d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, + d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies, d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies") log.Debug("Block body download terminated", "err", err) @@ -1285,10 +1289,12 @@ func (d *Downloader) fetchReceipts(from uint64) error { expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) } capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } - setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } + setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { + p.SetReceiptsIdle(accepted, deliveryTime) + } ) err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire, - d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, + d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts") log.Debug("Transaction receipt download terminated", "err", err) @@ -1321,9 +1327,9 @@ func (d *Downloader) fetchReceipts(from uint64) error { // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) // - kind: textual label of the type being downloaded to display in log messages func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, - expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error), + expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool), fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, - idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error { + idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error { // Create a ticker to detect expired retrieval tasks ticker := time.NewTicker(100 * time.Millisecond) @@ -1339,6 +1345,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) return errCanceled case packet := <-deliveryCh: + deliveryTime := time.Now() // If the peer was previously banned and failed to deliver its pack // in a reasonable time frame, ignore its message. if peer := d.peers.Peer(packet.PeerId()); peer != nil { @@ -1351,7 +1358,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) // caused by a timed out request which came through in the end), set it to // idle. If the delivery's stale, the peer should have already been idled. if !errors.Is(err, errStaleDelivery) { - setIdle(peer, accepted) + setIdle(peer, accepted, deliveryTime) } // Issue a log to the user to see what's going on switch { @@ -1404,7 +1411,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) // how response times reacts, to it always requests one more than the minimum (i.e. min 2). if fails > 2 { peer.log.Trace("Data delivery timed out", "type", kind) - setIdle(peer, 0) + setIdle(peer, 0, time.Now()) } else { peer.log.Warn("Stalling delivery, dropping", "type", kind) @@ -1439,27 +1446,27 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) // Send a download request to all idle peers, until throttled progressed, throttled, running := false, false, inFlight() idles, total := idle() - + pendCount := pending() for _, peer := range idles { // Short circuit if throttling activated - if throttle() { - throttled = true + if throttled { break } // Short circuit if there is no more available task. - if pending() == 0 { + if pendCount = pending(); pendCount == 0 { break } // Reserve a chunk of fetches for a peer. A nil can mean either that // no more headers are available, or that the peer is known not to // have them. - request, progress, err := reserve(peer, capacity(peer)) - if err != nil { - return err - } + request, progress, throttle := reserve(peer, capacity(peer)) if progress { progressed = true } + if throttle { + throttled = true + throttleCounter.Inc(1) + } if request == nil { continue } @@ -1484,7 +1491,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { + if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 { return errPeersUnavailable } } @@ -1610,7 +1617,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er if n > 0 && rollback == 0 { rollback = chunk[0].Number.Uint64() } - log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) + log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err) return fmt.Errorf("%w: %v", errInvalidChain, err) } // All verifications passed, track all headers within the alloted limits @@ -1636,7 +1643,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er inserts := d.queue.Schedule(chunk, origin) if len(inserts) != len(chunk) { log.Debug("Stale headers") - rollbackErr = errBadPeer + rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk)) return fmt.Errorf("%w: stale headers", errBadPeer) } } @@ -1844,6 +1851,14 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { } func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) { + if len(results) == 0 { + return nil, nil, nil + } + if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot { + // the pivot is somewhere in the future + return nil, results, nil + } + // This can also be optimized, but only happens very seldom for _, result := range results { num := result.Header.Number.Uint64() switch { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 1933015518..11f6dd361b 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -306,7 +306,7 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) { } else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil { return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err) } - if _, ok := dl.ownHeaders[block.Hash()]; !ok { + if hdr := dl.getHeaderByHash(block.Hash()); hdr == nil { dl.ownHashes = append(dl.ownHashes, block.Hash()) dl.ownHeaders[block.Hash()] = block.Header() } @@ -570,7 +570,6 @@ func TestThrottling65Fast(t *testing.T) { testThrottling(t, 65, FastSync) } func testThrottling(t *testing.T, protocol int, mode SyncMode) { t.Parallel() tester := newTester() - defer tester.terminate() // Create a long block chain to download and the tester targetBlocks := testChainBase.len() - 1 @@ -602,31 +601,32 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { time.Sleep(25 * time.Millisecond) tester.lock.Lock() - tester.downloader.queue.lock.Lock() - cached = len(tester.downloader.queue.blockDonePool) - if mode == FastSync { - if receipts := len(tester.downloader.queue.receiptDonePool); receipts < cached { - cached = receipts - } + { + tester.downloader.queue.resultCache.lock.Lock() + cached = tester.downloader.queue.resultCache.countCompleted() + tester.downloader.queue.resultCache.lock.Unlock() + frozen = int(atomic.LoadUint32(&blocked)) + retrieved = len(tester.ownBlocks) + } - frozen = int(atomic.LoadUint32(&blocked)) - retrieved = len(tester.ownBlocks) - tester.downloader.queue.lock.Unlock() tester.lock.Unlock() - if cached == blockCacheItems || cached == blockCacheItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay { + if cached == blockCacheItems || + cached == blockCacheItems-reorgProtHeaderDelay || + retrieved+cached+frozen == targetBlocks+1 || + retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay { break } } // Make sure we filled up the cache, then exhaust it time.Sleep(25 * time.Millisecond) // give it a chance to screw up - tester.lock.RLock() retrieved = len(tester.ownBlocks) tester.lock.RUnlock() if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay { t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1) } + // Permit the blocked blocks to import if atomic.LoadUint32(&blocked) > 0 { atomic.StoreUint32(&blocked, uint32(0)) @@ -638,6 +638,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { if err := <-errc; err != nil { t.Fatalf("block synchronization failed: %v", err) } + tester.terminate() + } // Tests that simple synchronization against a forked chain works correctly. In @@ -659,7 +661,6 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) { chainB := testChainForkLightB.shorten(testChainBase.len() + 80) tester.newPeer("fork A", protocol, chainA) tester.newPeer("fork B", protocol, chainB) - // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("fork A", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) @@ -748,15 +749,12 @@ func TestBoundedHeavyForkedSync65Light(t *testing.T) { testBoundedHeavyForkedSyn func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { t.Parallel() - tester := newTester() - defer tester.terminate() // Create a long enough forked chain chainA := testChainForkLightA chainB := testChainForkHeavy tester.newPeer("original", protocol, chainA) - tester.newPeer("heavy-rewriter", protocol, chainB) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("original", nil, mode); err != nil { @@ -764,10 +762,12 @@ func testBoundedHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) { } assertOwnChain(t, tester, chainA.len()) + tester.newPeer("heavy-rewriter", protocol, chainB) // Synchronise with the second peer and ensure that the fork is rejected to being too old if err := tester.sync("heavy-rewriter", nil, mode); err != errInvalidAncestor { t.Fatalf("sync failure mismatch: have %v, want %v", err, errInvalidAncestor) } + tester.terminate() } // Tests that an inactive downloader will not accept incoming block headers, @@ -959,7 +959,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { t.Parallel() tester := newTester() - defer tester.terminate() // Create a small enough block chain to download targetBlocks := 3*fsHeaderSafetyNet + 256 + int(fsMinFullBlocks) @@ -1039,6 +1038,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) { t.Fatalf("synchronised blocks mismatch: have %v, want %v", bs, chain.len()) } } + tester.terminate() } // Tests that a peer advertising a high TD doesn't get to stall the downloader @@ -1053,13 +1053,13 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) { t.Parallel() tester := newTester() - defer tester.terminate() chain := testChainBase.shorten(1) tester.newPeer("attack", protocol, chain) if err := tester.sync("attack", big.NewInt(1000000), mode); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } + tester.terminate() } // Tests that misbehaving peers are disconnected, whilst behaving ones are not. diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index 97cfb531a4..f963b22d38 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -40,4 +40,6 @@ var ( stateInMeter = metrics.NewRegisteredMeter("eth/downloader/states/in", nil) stateDropMeter = metrics.NewRegisteredMeter("eth/downloader/states/drop", nil) + + throttleCounter = metrics.NewRegisteredCounter("eth/downloader/throttle", nil) ) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index b827a32705..49a1f88526 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -117,9 +117,7 @@ func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *pe return &peerConnection{ id: id, lacking: make(map[common.Hash]struct{}), - - peer: peer, - + peer: peer, version: version, log: logger, } @@ -173,12 +171,14 @@ func (p *peerConnection) FetchBodies(request *fetchRequest) error { } p.blockStarted = time.Now() - // Convert the header set to a retrievable slice - hashes := make([]common.Hash, 0, len(request.Headers)) - for _, header := range request.Headers { - hashes = append(hashes, header.Hash()) - } - go p.peer.RequestBodies(hashes) + go func() { + // Convert the header set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Headers)) + for _, header := range request.Headers { + hashes = append(hashes, header.Hash()) + } + p.peer.RequestBodies(hashes) + }() return nil } @@ -195,12 +195,14 @@ func (p *peerConnection) FetchReceipts(request *fetchRequest) error { } p.receiptStarted = time.Now() - // Convert the header set to a retrievable slice - hashes := make([]common.Hash, 0, len(request.Headers)) - for _, header := range request.Headers { - hashes = append(hashes, header.Hash()) - } - go p.peer.RequestReceipts(hashes) + go func() { + // Convert the header set to a retrievable slice + hashes := make([]common.Hash, 0, len(request.Headers)) + for _, header := range request.Headers { + hashes = append(hashes, header.Hash()) + } + p.peer.RequestReceipts(hashes) + }() return nil } @@ -225,34 +227,34 @@ func (p *peerConnection) FetchNodeData(hashes []common.Hash) error { // SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval // requests. Its estimated header retrieval throughput is updated with that measured // just now. -func (p *peerConnection) SetHeadersIdle(delivered int) { - p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle) +func (p *peerConnection) SetHeadersIdle(delivered int, deliveryTime time.Time) { + p.setIdle(deliveryTime.Sub(p.headerStarted), delivered, &p.headerThroughput, &p.headerIdle) } // SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval // requests. Its estimated body retrieval throughput is updated with that measured // just now. -func (p *peerConnection) SetBodiesIdle(delivered int) { - p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) +func (p *peerConnection) SetBodiesIdle(delivered int, deliveryTime time.Time) { + p.setIdle(deliveryTime.Sub(p.blockStarted), delivered, &p.blockThroughput, &p.blockIdle) } // SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt // retrieval requests. Its estimated receipt retrieval throughput is updated // with that measured just now. -func (p *peerConnection) SetReceiptsIdle(delivered int) { - p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle) +func (p *peerConnection) SetReceiptsIdle(delivered int, deliveryTime time.Time) { + p.setIdle(deliveryTime.Sub(p.receiptStarted), delivered, &p.receiptThroughput, &p.receiptIdle) } // SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie // data retrieval requests. Its estimated state retrieval throughput is updated // with that measured just now. -func (p *peerConnection) SetNodeDataIdle(delivered int) { - p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle) +func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time) { + p.setIdle(deliveryTime.Sub(p.stateStarted), delivered, &p.stateThroughput, &p.stateIdle) } // setIdle sets the peer to idle, allowing it to execute new retrieval requests. // Its estimated retrieval throughput is updated with that measured just now. -func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) { +func (p *peerConnection) setIdle(elapsed time.Duration, delivered int, throughput *float64, idle *int32) { // Irrelevant of the scaling, make sure the peer ends up idle defer atomic.StoreInt32(idle, 0) @@ -265,7 +267,9 @@ func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *f return } // Otherwise update the throughput with a new measurement - elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor + if elapsed <= 0 { + elapsed = 1 // +1 (ns) to ensure non-zero divisor + } measured := float64(delivered) / (float64(elapsed) / float64(time.Second)) *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured @@ -523,22 +527,20 @@ func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerC defer ps.lock.RUnlock() idle, total := make([]*peerConnection, 0, len(ps.peers)), 0 + tps := make([]float64, 0, len(ps.peers)) for _, p := range ps.peers { if p.version >= minProtocol && p.version <= maxProtocol { if idleCheck(p) { idle = append(idle, p) + tps = append(tps, throughput(p)) } total++ } } - for i := 0; i < len(idle); i++ { - for j := i + 1; j < len(idle); j++ { - if throughput(idle[i]) < throughput(idle[j]) { - idle[i], idle[j] = idle[j], idle[i] - } - } - } - return idle, total + // And sort them + sortPeers := &peerThroughputSort{idle, tps} + sort.Sort(sortPeers) + return sortPeers.p, total } // medianRTT returns the median RTT of the peerset, considering only the tuning @@ -571,3 +573,24 @@ func (ps *peerSet) medianRTT() time.Duration { } return median } + +// peerThroughputSort implements the Sort interface, and allows for +// sorting a set of peers by their throughput +// The sorted data is with the _highest_ throughput first +type peerThroughputSort struct { + p []*peerConnection + tp []float64 +} + +func (ps *peerThroughputSort) Len() int { + return len(ps.p) +} + +func (ps *peerThroughputSort) Less(i, j int) bool { + return ps.tp[i] > ps.tp[j] +} + +func (ps *peerThroughputSort) Swap(i, j int) { + ps.p[i], ps.p[j] = ps.p[j], ps.p[i] + ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i] +} diff --git a/eth/downloader/peer_test.go b/eth/downloader/peer_test.go new file mode 100644 index 0000000000..4bf0e200bb --- /dev/null +++ b/eth/downloader/peer_test.go @@ -0,0 +1,53 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package downloader + +import ( + "sort" + "testing" +) + +func TestPeerThroughputSorting(t *testing.T) { + a := &peerConnection{ + id: "a", + headerThroughput: 1.25, + } + b := &peerConnection{ + id: "b", + headerThroughput: 1.21, + } + c := &peerConnection{ + id: "c", + headerThroughput: 1.23, + } + + peers := []*peerConnection{a, b, c} + tps := []float64{a.headerThroughput, + b.headerThroughput, c.headerThroughput} + sortPeers := &peerThroughputSort{peers, tps} + sort.Sort(sortPeers) + if got, exp := sortPeers.p[0].id, "a"; got != exp { + t.Errorf("sort fail, got %v exp %v", got, exp) + } + if got, exp := sortPeers.p[1].id, "c"; got != exp { + t.Errorf("sort fail, got %v exp %v", got, exp) + } + if got, exp := sortPeers.p[2].id, "b"; got != exp { + t.Errorf("sort fail, got %v exp %v", got, exp) + } + +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 52e70ed9b2..8b4099d4f7 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/celo-org/celo-blockchain/common" @@ -32,6 +33,11 @@ import ( "github.com/celo-org/celo-blockchain/metrics" ) +const ( + bodyType = uint(0) + receiptType = uint(1) +) + var ( blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching @@ -54,8 +60,7 @@ type fetchRequest struct { // fetchResult is a struct collecting partial results from data fetchers until // all outstanding pieces complete and the result as a whole can be processed. type fetchResult struct { - Pending int // Number of data fetches still pending - Hash common.Hash // Hash of the header to prevent recalculating + pending int32 // Flag telling what deliveries are outstanding Header *types.Header Transactions types.Transactions @@ -64,6 +69,44 @@ type fetchResult struct { EpochSnarkData *types.EpochSnarkData } +func newFetchResult(header *types.Header, fastSync bool) *fetchResult { + item := &fetchResult{ + Header: header, + } + if !header.EmptyBody() { + item.pending |= (1 << bodyType) + } + if fastSync && !header.EmptyReceipts() { + item.pending |= (1 << receiptType) + } + return item +} + +// SetBodyDone flags the body as finished. +func (f *fetchResult) SetBodyDone() { + if v := atomic.LoadInt32(&f.pending); (v & (1 << bodyType)) != 0 { + atomic.AddInt32(&f.pending, -1) + } +} + +// AllDone checks if item is done. +func (f *fetchResult) AllDone() bool { + return atomic.LoadInt32(&f.pending) == 0 +} + +// SetReceiptsDone flags the receipts as finished. +func (f *fetchResult) SetReceiptsDone() { + if v := atomic.LoadInt32(&f.pending); (v & (1 << receiptType)) != 0 { + atomic.AddInt32(&f.pending, -2) + } +} + +// Done checks if the given type is done already +func (f *fetchResult) Done(kind uint) bool { + v := atomic.LoadInt32(&f.pending) + return v&(1< common.StorageSize(blockCacheMemory) { - limit = int((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) - } - // Calculate the number of slots already finished - finished := 0 - for _, result := range q.resultCache[:limit] { - if result == nil { - break - } - if _, ok := donePool[result.Hash]; ok { - finished++ - } - } - // Calculate the number of slots currently downloading - pending := 0 - for _, request := range pendPool { - for _, header := range request.Headers { - if header.Number.Uint64() < q.resultOffset+uint64(limit) { - pending++ - } - } - } - // Return the free slots to distribute - return limit - finished - pending + return (queued + pending) == 0 } // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill @@ -324,21 +307,22 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { break } // Make sure no duplicate requests are executed + // We cannot skip this, even if the block is empty, since this is + // what triggers the fetchResult creation. if _, ok := q.blockTaskPool[hash]; ok { log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash) - continue - } - if _, ok := q.receiptTaskPool[hash]; ok { - log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash) - continue + } else { + q.blockTaskPool[hash] = header + q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) } - // Queue the header for content retrieval - q.blockTaskPool[hash] = header - q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) - - if q.mode == FastSync { - q.receiptTaskPool[hash] = header - q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + // Queue for receipt retrieval + if q.mode == FastSync && !header.EmptyReceipts() { + if _, ok := q.receiptTaskPool[hash]; ok { + log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash) + } else { + q.receiptTaskPool[hash] = header + q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + } } inserts = append(inserts, header) q.headerHead = hash @@ -348,65 +332,76 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { } // Results retrieves and permanently removes a batch of fetch results from -// the cache. The result slice will be empty if the queue has been closed. +// the cache. the result slice will be empty if the queue has been closed. +// Results can be called concurrently with Deliver and Schedule, +// but assumes that there are not two simultaneous callers to Results func (q *queue) Results(block bool) []*fetchResult { - q.lock.Lock() - defer q.lock.Unlock() - - // Count the number of items available for processing - nproc := q.countProcessableItems() - for nproc == 0 && !q.closed { - if !block { - return nil + // Abort early if there are no items and non-blocking requested + if !block && !q.resultCache.HasCompletedItems() { + return nil + } + closed := false + for !closed && !q.resultCache.HasCompletedItems() { + // In order to wait on 'active', we need to obtain the lock. + // That may take a while, if someone is delivering at the same + // time, so after obtaining the lock, we check again if there + // are any results to fetch. + // Also, in-between we ask for the lock and the lock is obtained, + // someone can have closed the queue. In that case, we should + // return the available results and stop blocking + q.lock.Lock() + if q.resultCache.HasCompletedItems() || q.closed { + q.lock.Unlock() + break } + // No items available, and not closed q.active.Wait() - nproc = q.countProcessableItems() - } - // Since we have a batch limit, don't pull more into "dangling" memory - if nproc > maxResultsProcess { - nproc = maxResultsProcess - } - results := make([]*fetchResult, nproc) - copy(results, q.resultCache[:nproc]) - if len(results) > 0 { - // Mark results as done before dropping them from the cache. - for _, result := range results { - hash := result.Header.Hash() - delete(q.blockDonePool, hash) - delete(q.receiptDonePool, hash) + closed = q.closed + q.lock.Unlock() + } + // Regardless if closed or not, we can still deliver whatever we have + results := q.resultCache.GetCompleted(maxResultsProcess) + for _, result := range results { + // Recalculate the result item weights to prevent memory exhaustion + size := result.Header.Size() + for _, receipt := range result.Receipts { + size += receipt.Size() } - // Delete the results from the cache and clear the tail. - copy(q.resultCache, q.resultCache[nproc:]) - for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ { - q.resultCache[i] = nil + for _, tx := range result.Transactions { + size += tx.Size() } - // Advance the expected block number of the first cache entry. - q.resultOffset += uint64(nproc) + size += result.Randomness.Size() + q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize + } + // Using the newly calibrated resultsize, figure out the new throttle limit + // on the result cache + throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize) + throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold) - // Recalculate the result item weights to prevent memory exhaustion - for _, result := range results { - size := result.Header.Size() - for _, receipt := range result.Receipts { - size += receipt.Size() - } - for _, tx := range result.Transactions { - size += tx.Size() - } - size += result.Randomness.Size() - q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize - } + // Log some info at certain times + if time.Since(q.lastStatLog) > 10*time.Second { + q.lastStatLog = time.Now() + info := q.Stats() + info = append(info, "throttle", throttleThreshold) + log.Info("Downloader queue stats", info...) } return results } -// countProcessableItems counts the processable items. -func (q *queue) countProcessableItems() int { - for i, result := range q.resultCache { - if result == nil || result.Pending > 0 { - return i - } +func (q *queue) Stats() []interface{} { + q.lock.RLock() + defer q.lock.RUnlock() + + return q.stats() +} + +func (q *queue) stats() []interface{} { + return []interface{}{ + "receiptTasks", q.receiptTaskQueue.Size(), + "blockTasks", q.blockTaskQueue.Size(), + "itemSize", q.resultSize, } - return len(q.resultCache) } // ReserveHeaders reserves a set of headers for the given peer, skipping any @@ -452,28 +447,21 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { // ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. -func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) { - isNoop := func(header *types.Header) bool { - // All headers must be fetched so that the random beacon can be updated correctly. - return false - } +func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, bool) { q.lock.Lock() defer q.lock.Unlock() - return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop) + return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, bodyType) } // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping // any previously failed downloads. Beside the next batch of needed fetches, it // also returns a flag whether empty receipts were queued requiring importing. -func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) { - isNoop := func(header *types.Header) bool { - return header.ReceiptHash == types.EmptyRootHash - } +func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, bool) { q.lock.Lock() defer q.lock.Unlock() - return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop) + return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, receiptType) } // reserveHeaders reserves a set of data download operations for a given peer, @@ -483,57 +471,76 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo // Note, this method expects the queue lock to be already held for writing. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. +// +// Returns: +// item - the fetchRequest +// progress - whether any progress was made +// throttle - if the caller should throttle for a while func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, - pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) { + pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) { // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) if taskQueue.Empty() { - return nil, false, nil + return nil, false, true } if _, ok := pendPool[p.id]; ok { - return nil, false, nil + return nil, false, false } - // Calculate an upper limit on the items we might fetch (i.e. throttling) - space := q.resultSlots(pendPool, donePool) - // Retrieve a batch of tasks, skipping previously failed ones send := make([]*types.Header, 0, count) skip := make([]*types.Header, 0) - progress := false - for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { - header := taskQueue.PopItem().(*types.Header) - hash := header.Hash() - - // If we're the first to request this task, initialise the result container - index := int(header.Number.Int64() - int64(q.resultOffset)) - if index >= len(q.resultCache) || index < 0 { - common.Report("index allocation went beyond available resultCache space") - return nil, false, fmt.Errorf("%w: index allocation went beyond available resultCache space", errInvalidChain) + throttled := false + for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ { + // the task queue will pop items in order, so the highest prio block + // is also the lowest block number. + h, _ := taskQueue.Peek() + header := h.(*types.Header) + // we can ask the resultcache if this header is within the + // "prioritized" segment of blocks. If it is not, we need to throttle + + stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == FastSync) + if stale { + // Don't put back in the task queue, this item has already been + // delivered upstream + taskQueue.PopItem() + progress = true + delete(taskPool, header.Hash()) + proc = proc - 1 + log.Error("Fetch reservation already delivered", "number", header.Number.Uint64()) + continue } - if q.resultCache[index] == nil { - components := 1 - if q.mode == FastSync { - components = 2 - } - q.resultCache[index] = &fetchResult{ - Pending: components, - Hash: hash, - Header: header, - } + if throttle { + // There are no resultslots available. Leave it in the task queue + // However, if there are any left as 'skipped', we should not tell + // the caller to throttle, since we still want some other + // peer to fetch those for us + throttled = len(skip) == 0 + break } - // If this fetch task is a noop, skip this fetch operation - if isNoop(header) { - donePool[hash] = struct{}{} - delete(taskPool, hash) - - space, proc = space-1, proc-1 - q.resultCache[index].Pending-- + if err != nil { + // this most definitely should _not_ happen + log.Warn("Failed to reserve headers", "err", err) + // There are no resultslots available. Leave it in the task queue + break + } + // Only required if the reserve is for a body type + if kind == bodyType { + // All headers must be fetched so that the random beacon can be updated correctly. + item.pending |= (1 << bodyType) + } + if item.Done(kind) { + // If it's a noop, we can skip this task + delete(taskPool, header.Hash()) + taskQueue.PopItem() + proc = proc - 1 progress = true continue } + // Remove it from the task queue + taskQueue.PopItem() // Otherwise unless the peer is known not to have the data, add to the retrieve list - if p.Lacks(hash) { + if p.Lacks(header.Hash()) { skip = append(skip, header) } else { send = append(send, header) @@ -543,13 +550,13 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common for _, header := range skip { taskQueue.Push(header, -int64(header.Number.Uint64())) } - if progress { + if q.resultCache.HasCompletedItems() { // Wake Results, resultCache was modified q.active.Signal() } // Assemble and return the block download request if len(send) == 0 { - return nil, progress, nil + return nil, progress, throttled } request := &fetchRequest{ Peer: p, @@ -557,8 +564,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common Time: time.Now(), } pendPool[p.id] = request - - return request, progress, nil + return request, progress, throttled } // CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue. @@ -769,16 +775,21 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, randomn q.lock.Lock() defer q.lock.Unlock() - reconstruct := func(header *types.Header, index int, result *fetchResult) error { + validate := func(index int, header *types.Header) error { if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash { return errInvalidBody } + return nil + } + + reconstruct := func(index int, result *fetchResult) { result.Transactions = txLists[index] result.Randomness = randomnessList[index] result.EpochSnarkData = epochSnarkDataList[index] - return nil + result.SetBodyDone() } - return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct) + return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, + bodyReqTimer, len(txLists), validate, reconstruct) } // DeliverReceipts injects a receipt retrieval response into the results queue. @@ -787,25 +798,29 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, randomn func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) { q.lock.Lock() defer q.lock.Unlock() - - reconstruct := func(header *types.Header, index int, result *fetchResult) error { + validate := func(index int, header *types.Header) error { if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash { return errInvalidReceipt } - result.Receipts = receiptList[index] return nil } - return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct) + reconstruct := func(index int, result *fetchResult) { + result.Receipts = receiptList[index] + result.SetReceiptsDone() + } + return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, + receiptReqTimer, len(receiptList), validate, reconstruct) } // deliver injects a data retrieval response into the results queue. // // Note, this method expects the queue lock to be already held for writing. The -// reason the lock is not obtained in here is because the parameters already need +// reason this lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, - pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer, - results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) { +func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, + taskQueue *prque.Prque, pendPool map[string]*fetchRequest, reqTimer metrics.Timer, + results int, validate func(index int, header *types.Header) error, + reconstruct func(index int, result *fetchResult)) (int, error) { // Short circuit if the data was never requested request := pendPool[id] @@ -825,52 +840,53 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ var ( accepted int failure error - useful bool + i int + hashes []common.Hash ) - for i, header := range request.Headers { + for _, header := range request.Headers { // Short circuit assembly if no more fetch results are found if i >= results { break } - // Reconstruct the next result if contents match up - index := int(header.Number.Int64() - int64(q.resultOffset)) - if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil { - failure = errInvalidChain - break - } - if err := reconstruct(header, i, q.resultCache[index]); err != nil { + // Validate the fields + if err := validate(i, header); err != nil { failure = err break } - hash := header.Hash() - - donePool[hash] = struct{}{} - q.resultCache[index].Pending-- - useful = true - accepted++ + hashes = append(hashes, header.Hash()) + i++ + } + for _, header := range request.Headers[:i] { + if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil { + reconstruct(accepted, res) + } else { + // else: betweeen here and above, some other peer filled this result, + // or it was indeed a no-op. This should not happen, but if it does it's + // not something to panic about + log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err) + failure = errStaleDelivery + } // Clean up a successful fetch - request.Headers[i] = nil - delete(taskPool, hash) + delete(taskPool, hashes[accepted]) + accepted++ } // Return all failed or missing fetches to the queue - for _, header := range request.Headers { - if header != nil { - taskQueue.Push(header, -int64(header.Number.Uint64())) - } + for _, header := range request.Headers[accepted:] { + taskQueue.Push(header, -int64(header.Number.Uint64())) } // Wake up Results if accepted > 0 { q.active.Signal() } - // If none of the data was good, it's a stale delivery if failure == nil { return accepted, nil } + // If none of the data was good, it's a stale delivery if errors.Is(failure, errInvalidChain) { return accepted, failure } - if useful { + if accepted > 0 { return accepted, fmt.Errorf("partial failure: %v", failure) } return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery) @@ -883,8 +899,6 @@ func (q *queue) Prepare(offset uint64, mode SyncMode) { defer q.lock.Unlock() // Prepare the queue for sync results - if q.resultOffset < offset { - q.resultOffset = offset - } + q.resultCache.Prepare(offset) q.mode = mode } diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go new file mode 100644 index 0000000000..2694531f00 --- /dev/null +++ b/eth/downloader/queue_test.go @@ -0,0 +1,428 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "fmt" + "math/big" + "math/rand" + "sync" + "testing" + "time" + + "github.com/celo-org/celo-blockchain/common" + mockEngine "github.com/celo-org/celo-blockchain/consensus/consensustest" + "github.com/celo-org/celo-blockchain/core" + "github.com/celo-org/celo-blockchain/core/rawdb" + "github.com/celo-org/celo-blockchain/core/types" + "github.com/celo-org/celo-blockchain/log" + "github.com/celo-org/celo-blockchain/params" +) + +var ( + testdb = rawdb.NewMemoryDatabase() + genesis = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000)) +) + +// makeChain creates a chain of n blocks starting at and including parent. +// the returned hash chain is ordered head->parent. In addition, every 3rd block +// contains a transaction and every 5th an uncle to allow testing correct block +// reassembly. +func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) { + blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, mockEngine.NewFaker(), testdb, n, func(i int, block *core.BlockGen) { + block.SetCoinbase(common.Address{seed}) + // Add one tx to every secondblock + if !empty && i%2 == 0 { + signer := types.MakeSigner(params.TestChainConfig, block.Number()) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil, nil, nil, nil), signer, testKey) + if err != nil { + panic(err) + } + block.AddTx(tx) + } + }) + return blocks, receipts +} + +type chainData struct { + blocks []*types.Block + offset int +} + +var chain *chainData +var emptyChain *chainData + +func init() { + // Create a chain of blocks to import + targetBlocks := 128 + blocks, _ := makeChain(targetBlocks, 0, genesis, false) + chain = &chainData{blocks, 0} + + blocks, _ = makeChain(targetBlocks, 0, genesis, true) + emptyChain = &chainData{blocks, 0} +} + +func (chain *chainData) headers() []*types.Header { + hdrs := make([]*types.Header, len(chain.blocks)) + for i, b := range chain.blocks { + hdrs[i] = b.Header() + } + return hdrs +} + +func (chain *chainData) Len() int { + return len(chain.blocks) +} + +func dummyPeer(id string) *peerConnection { + p := &peerConnection{ + id: id, + lacking: make(map[common.Hash]struct{}), + } + return p +} + +func TestBasics(t *testing.T) { + q := newQueue(10) + if !q.Idle() { + t.Errorf("new queue should be idle") + } + q.Prepare(1, FastSync) + if res := q.Results(false); len(res) != 0 { + t.Fatal("new queue should have 0 results") + } + + // Schedule a batch of headers + q.Schedule(chain.headers(), 1) + if q.Idle() { + t.Errorf("queue should not be idle") + } + if got, exp := q.PendingBlocks(), chain.Len(); got != exp { + t.Errorf("wrong pending block count, got %d, exp %d", got, exp) + } + // Only non-empty receipts get added to task-queue + if got, exp := q.PendingReceipts(), 64; got != exp { + t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp) + } + // Items are now queued for downloading, next step is that we tell the + // queue that a certain peer will deliver them for us + { + peer := dummyPeer("peer-1") + fetchReq, _, throttle := q.ReserveBodies(peer, 50) + if !throttle { + // queue size is only 10, so throttling should occur + t.Fatal("should throttle") + } + // But we should still get the first things to fetch + if got, exp := len(fetchReq.Headers), 10; got != exp { + t.Fatalf("expected %d requests, got %d", exp, got) + } + if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp { + t.Fatalf("expected header %d, got %d", exp, got) + } + } + { + peer := dummyPeer("peer-2") + fetchReq, _, throttle := q.ReserveBodies(peer, 50) + + // The second peer should hit throttling + if !throttle { + t.Fatalf("should not throttle") + } + // And not get any fetches at all, since it was throttled to begin with + if fetchReq != nil { + t.Fatalf("should have no fetches, got %d", len(fetchReq.Headers)) + } + } + //fmt.Printf("blockTaskQueue len: %d\n", q.blockTaskQueue.Size()) + //fmt.Printf("receiptTaskQueue len: %d\n", q.receiptTaskQueue.Size()) + { + // The receipt delivering peer should not be affected + // by the throttling of body deliveries + peer := dummyPeer("peer-3") + fetchReq, _, throttle := q.ReserveReceipts(peer, 50) + if !throttle { + // queue size is only 10, so throttling should occur + t.Fatal("should throttle") + } + // But we should still get the first things to fetch + if got, exp := len(fetchReq.Headers), 5; got != exp { + t.Fatalf("expected %d requests, got %d", exp, got) + } + if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp { + t.Fatalf("expected header %d, got %d", exp, got) + } + + } + //fmt.Printf("blockTaskQueue len: %d\n", q.blockTaskQueue.Size()) + //fmt.Printf("receiptTaskQueue len: %d\n", q.receiptTaskQueue.Size()) + //fmt.Printf("processable: %d\n", q.resultCache.countCompleted()) +} + +func TestEmptyBlocks(t *testing.T) { + q := newQueue(10) + + q.Prepare(1, FastSync) + // Schedule a batch of headers + q.Schedule(emptyChain.headers(), 1) + if q.Idle() { + t.Errorf("queue should not be idle") + } + if got, exp := q.PendingBlocks(), len(emptyChain.blocks); got != exp { + t.Errorf("wrong pending block count, got %d, exp %d", got, exp) + } + if got, exp := q.PendingReceipts(), 0; got != exp { + t.Errorf("wrong pending receipt count, got %d, exp %d", got, exp) + } + // They won't be processable, because the fetchresults haven't been + // created yet + if got, exp := q.resultCache.countCompleted(), 0; got != exp { + t.Errorf("wrong processable count, got %d, exp %d", got, exp) + } + + // Items are now queued for downloading, next step is that we tell the + // queue that a certain peer will deliver them for us + // That should trigger all of them to suddenly become 'done' + { + // Reserve blocks + peer := dummyPeer("peer-1") + fetchReq, _, _ := q.ReserveBodies(peer, 50) + + // blocks are empty, but must be fetched the bodies so that the random beacon can be updated correctly + if fetchReq == nil { + t.Fatal("there should be body fetch tasks remaining") + } + + } + if q.blockTaskQueue.Size() != len(emptyChain.blocks)-10 { + t.Errorf("expected block task queue to be 0, got %d", q.blockTaskQueue.Size()) + } + if q.receiptTaskQueue.Size() != 0 { + t.Errorf("expected receipt task queue to be 0, got %d", q.receiptTaskQueue.Size()) + } + //fmt.Printf("receiptTaskQueue len: %d\n", q.receiptTaskQueue.Size()) + { + peer := dummyPeer("peer-3") + fetchReq, _, _ := q.ReserveReceipts(peer, 50) + + // there should be nothing to fetch, blocks are empty + if fetchReq != nil { + t.Fatal("there should be no body fetch tasks remaining") + } + } + if got, exp := q.resultCache.countCompleted(), 0; got != exp { + t.Errorf("wrong processable count, got %d, exp %d", got, exp) + } +} + +// TestDelivery does some more extensive testing of events that happen, +// blocks that become known and peers that make reservations and deliveries. +// disabled since it's not really a unit-test, but can be executed to test +// some more advanced scenarios +func TestDelivery(t *testing.T) { + t.Skip("not really a unit-test, but can be executed to test some more advanced scenarios") + // the outside network, holding blocks + blo, rec := makeChain(128, 0, genesis, false) + world := newNetwork() + world.receipts = rec + world.chain = blo + world.progress(10) + if false { + log.Root().SetHandler(log.StdoutHandler) + + } + q := newQueue(10) + var wg sync.WaitGroup + q.Prepare(1, FastSync) + wg.Add(1) + go func() { + // deliver headers + defer wg.Done() + c := 1 + for { + //fmt.Printf("getting headers from %d\n", c) + hdrs := world.headers(c) + l := len(hdrs) + //fmt.Printf("scheduling %d headers, first %d last %d\n", + // l, hdrs[0].Number.Uint64(), hdrs[len(hdrs)-1].Number.Uint64()) + q.Schedule(hdrs, uint64(c)) + c += l + } + }() + wg.Add(1) + go func() { + // collect results + defer wg.Done() + tot := 0 + for { + res := q.Results(true) + tot += len(res) + fmt.Printf("got %d results, %d tot\n", len(res), tot) + // Now we can forget about these + world.forget(res[len(res)-1].Header.Number.Uint64()) + + } + }() + wg.Add(1) + go func() { + defer wg.Done() + // reserve body fetch + i := 4 + for { + peer := dummyPeer(fmt.Sprintf("peer-%d", i)) + f, _, _ := q.ReserveBodies(peer, rand.Intn(30)) + if f != nil { + var txs [][]*types.Transaction + var randomnessList []*types.Randomness + var epochSnarkDataList []*types.EpochSnarkData + numToSkip := rand.Intn(len(f.Headers)) + for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] { + txs = append(txs, world.getTransactions(hdr.Number.Uint64())) + randomnessList = append(randomnessList, &types.Randomness{}) + epochSnarkDataList = append(epochSnarkDataList, &types.EpochSnarkData{}) + } + time.Sleep(100 * time.Millisecond) + _, err := q.DeliverBodies(peer.id, txs, randomnessList, epochSnarkDataList) + if err != nil { + fmt.Printf("delivered %d bodies %v\n", len(txs), err) + } + } else { + i++ + time.Sleep(200 * time.Millisecond) + } + } + }() + go func() { + defer wg.Done() + // reserve receiptfetch + peer := dummyPeer("peer-3") + for { + f, _, _ := q.ReserveReceipts(peer, rand.Intn(50)) + if f != nil { + var rcs [][]*types.Receipt + for _, hdr := range f.Headers { + rcs = append(rcs, world.getReceipts(hdr.Number.Uint64())) + } + _, err := q.DeliverReceipts(peer.id, rcs) + if err != nil { + fmt.Printf("delivered %d receipts %v\n", len(rcs), err) + } + time.Sleep(100 * time.Millisecond) + } else { + time.Sleep(200 * time.Millisecond) + } + } + }() + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + time.Sleep(300 * time.Millisecond) + //world.tick() + //fmt.Printf("trying to progress\n") + world.progress(rand.Intn(100)) + } + for i := 0; i < 50; i++ { + time.Sleep(2990 * time.Millisecond) + + } + }() + wg.Add(1) + go func() { + defer wg.Done() + for { + time.Sleep(990 * time.Millisecond) + fmt.Printf("world block tip is %d\n", + world.chain[len(world.chain)-1].Header().Number.Uint64()) + fmt.Println(q.Stats()) + } + }() + wg.Wait() +} + +func newNetwork() *network { + var l sync.RWMutex + return &network{ + cond: sync.NewCond(&l), + offset: 1, // block 1 is at blocks[0] + } +} + +// represents the network +type network struct { + offset int + chain []*types.Block + receipts []types.Receipts + lock sync.RWMutex + cond *sync.Cond +} + +func (n *network) getTransactions(blocknum uint64) types.Transactions { + index := blocknum - uint64(n.offset) + return n.chain[index].Transactions() +} +func (n *network) getReceipts(blocknum uint64) types.Receipts { + index := blocknum - uint64(n.offset) + if got := n.chain[index].Header().Number.Uint64(); got != blocknum { + fmt.Printf("Err, got %d exp %d\n", got, blocknum) + panic("sd") + } + return n.receipts[index] +} + +func (n *network) forget(blocknum uint64) { + index := blocknum - uint64(n.offset) + n.chain = n.chain[index:] + n.receipts = n.receipts[index:] + n.offset = int(blocknum) + +} +func (n *network) progress(numBlocks int) { + + n.lock.Lock() + defer n.lock.Unlock() + //fmt.Printf("progressing...\n") + newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false) + n.chain = append(n.chain, newBlocks...) + n.receipts = append(n.receipts, newR...) + n.cond.Broadcast() + +} + +func (n *network) headers(from int) []*types.Header { + numHeaders := 128 + var hdrs []*types.Header + index := from - n.offset + + for index >= len(n.chain) { + // wait for progress + n.cond.L.Lock() + //fmt.Printf("header going into wait\n") + n.cond.Wait() + index = from - n.offset + n.cond.L.Unlock() + } + n.lock.RLock() + defer n.lock.RUnlock() + for i, b := range n.chain[index:] { + hdrs = append(hdrs, b.Header()) + if i >= numHeaders { + break + } + } + return hdrs +} diff --git a/eth/downloader/resultstore.go b/eth/downloader/resultstore.go new file mode 100644 index 0000000000..38dcc8c6bd --- /dev/null +++ b/eth/downloader/resultstore.go @@ -0,0 +1,194 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/celo-org/celo-blockchain/core/types" +) + +// resultStore implements a structure for maintaining fetchResults, tracking their +// download-progress and delivering (finished) results. +type resultStore struct { + items []*fetchResult // Downloaded but not yet delivered fetch results + resultOffset uint64 // Offset of the first cached fetch result in the block chain + + // Internal index of first non-completed entry, updated atomically when needed. + // If all items are complete, this will equal length(items), so + // *important* : is not safe to use for indexing without checking against length + indexIncomplete int32 // atomic access + + // throttleThreshold is the limit up to which we _want_ to fill the + // results. If blocks are large, we want to limit the results to less + // than the number of available slots, and maybe only fill 1024 out of + // 8192 possible places. The queue will, at certain times, recalibrate + // this index. + throttleThreshold uint64 + + lock sync.RWMutex +} + +func newResultStore(size int) *resultStore { + return &resultStore{ + resultOffset: 0, + items: make([]*fetchResult, size), + throttleThreshold: uint64(size), + } +} + +// SetThrottleThreshold updates the throttling threshold based on the requested +// limit and the total queue capacity. It returns the (possibly capped) threshold +func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 { + r.lock.Lock() + defer r.lock.Unlock() + + limit := uint64(len(r.items)) + if threshold >= limit { + threshold = limit + } + r.throttleThreshold = threshold + return r.throttleThreshold +} + +// AddFetch adds a header for body/receipt fetching. This is used when the queue +// wants to reserve headers for fetching. +// +// It returns the following: +// stale - if true, this item is already passed, and should not be requested again +// throttled - if true, the store is at capacity, this particular header is not prio now +// item - the result to store data into +// err - any error that occurred +func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) { + r.lock.Lock() + defer r.lock.Unlock() + + var index int + item, index, stale, throttled, err = r.getFetchResult(header.Number.Uint64()) + if err != nil || stale || throttled { + return stale, throttled, item, err + } + if item == nil { + item = newFetchResult(header, fastSync) + r.items[index] = item + } + return stale, throttled, item, err +} + +// GetDeliverySlot returns the fetchResult for the given header. If the 'stale' flag +// is true, that means the header has already been delivered 'upstream'. This method +// does not bubble up the 'throttle' flag, since it's moot at the point in time when +// the item is downloaded and ready for delivery +func (r *resultStore) GetDeliverySlot(headerNumber uint64) (*fetchResult, bool, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + res, _, stale, _, err := r.getFetchResult(headerNumber) + return res, stale, err +} + +// getFetchResult returns the fetchResult corresponding to the given item, and +// the index where the result is stored. +func (r *resultStore) getFetchResult(headerNumber uint64) (item *fetchResult, index int, stale, throttle bool, err error) { + index = int(int64(headerNumber) - int64(r.resultOffset)) + throttle = index >= int(r.throttleThreshold) + stale = index < 0 + + if index >= len(r.items) { + err = fmt.Errorf("%w: index allocation went beyond available resultStore space "+ + "(index [%d] = header [%d] - resultOffset [%d], len(resultStore) = %d", errInvalidChain, + index, headerNumber, r.resultOffset, len(r.items)) + return nil, index, stale, throttle, err + } + if stale { + return nil, index, stale, throttle, nil + } + item = r.items[index] + return item, index, stale, throttle, nil +} + +// hasCompletedItems returns true if there are processable items available +// this method is cheaper than countCompleted +func (r *resultStore) HasCompletedItems() bool { + r.lock.RLock() + defer r.lock.RUnlock() + + if len(r.items) == 0 { + return false + } + if item := r.items[0]; item != nil && item.AllDone() { + return true + } + return false +} + +// countCompleted returns the number of items ready for delivery, stopping at +// the first non-complete item. +// +// The mthod assumes (at least) rlock is held. +func (r *resultStore) countCompleted() int { + // We iterate from the already known complete point, and see + // if any more has completed since last count + index := atomic.LoadInt32(&r.indexIncomplete) + for ; ; index++ { + if index >= int32(len(r.items)) { + break + } + result := r.items[index] + if result == nil || !result.AllDone() { + break + } + } + atomic.StoreInt32(&r.indexIncomplete, index) + return int(index) +} + +// GetCompleted returns the next batch of completed fetchResults +func (r *resultStore) GetCompleted(limit int) []*fetchResult { + r.lock.Lock() + defer r.lock.Unlock() + + completed := r.countCompleted() + if limit > completed { + limit = completed + } + results := make([]*fetchResult, limit) + copy(results, r.items[:limit]) + + // Delete the results from the cache and clear the tail. + copy(r.items, r.items[limit:]) + for i := len(r.items) - limit; i < len(r.items); i++ { + r.items[i] = nil + } + // Advance the expected block number of the first cache entry + r.resultOffset += uint64(limit) + atomic.AddInt32(&r.indexIncomplete, int32(-limit)) + + return results +} + +// Prepare initialises the offset with the given block number +func (r *resultStore) Prepare(offset uint64) { + r.lock.Lock() + defer r.lock.Unlock() + + if r.resultOffset < offset { + r.resultOffset = offset + } +} diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index 9276060330..005aa82a4d 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -34,7 +34,7 @@ import ( // stateReq represents a batch of state fetch requests grouped together into // a single data retrieval network packet. type stateReq struct { - items []common.Hash // Hashes of the state items to download + nItems uint16 // Number of items requested for download (max is 384, so uint16 is sufficient) tasks map[common.Hash]*stateTask // Download tasks to track previous attempts timeout time.Duration // Maximum round trip time for this to complete timer *time.Timer // Timer to fire when the RTT timeout expires @@ -99,7 +99,6 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { finished []*stateReq // Completed or failed requests timeout = make(chan *stateReq) // Timed out active requests ) - // Run the state sync. log.Trace("State sync starting", "root", s.root) go s.run() @@ -235,16 +234,16 @@ func (d *Downloader) spindownStateSync(active map[string]*stateReq, finished []* if req == nil { continue } - req.peer.log.Trace("State peer marked idle (spindown)", "req.items", len(req.items), "reason", reason) + req.peer.log.Trace("State peer marked idle (spindown)", "req.items", int(req.nItems), "reason", reason) req.timer.Stop() delete(active, req.peer.id) - req.peer.SetNodeDataIdle(len(req.items)) + req.peer.SetNodeDataIdle(int(req.nItems), time.Now()) } // The 'finished' set contains deliveries that we were going to pass to processing. // Those are now moot, but we still need to set those peers as idle, which would // otherwise have been done after processing for _, req := range finished { - req.peer.SetNodeDataIdle(len(req.items)) + req.peer.SetNodeDataIdle(int(req.nItems), time.Now()) } } @@ -350,9 +349,10 @@ func (s *stateSync) loop() (err error) { return errCanceled case req := <-s.deliver: + deliveryTime := time.Now() // Response, disconnect or timeout triggered, drop the peer if stalling log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut()) - if len(req.items) <= 2 && !req.dropped && req.timedOut() { + if req.nItems <= 2 && !req.dropped && req.timedOut() { // 2 items are the minimum requested, if even that times out, we've no use of // this peer at the moment. log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id) @@ -376,7 +376,7 @@ func (s *stateSync) loop() (err error) { } // Process all the received blobs and check for stale delivery delivered, err := s.process(req) - req.peer.SetNodeDataIdle(delivered) + req.peer.SetNodeDataIdle(delivered, deliveryTime) if err != nil { log.Warn("Node data write error", "err", err) return err @@ -413,14 +413,14 @@ func (s *stateSync) assignTasks() { // Assign a batch of fetches proportional to the estimated latency/bandwidth cap := p.NodeDataCapacity(s.d.requestRTT()) req := &stateReq{peer: p, timeout: s.d.requestTTL()} - s.fillTasks(cap, req) + items := s.fillTasks(cap, req) // If the peer was assigned tasks to fetch, send the network request - if len(req.items) > 0 { - req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items), "root", s.root) + if len(items) > 0 { + req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(items), "root", s.root) select { case s.d.trackStateReq <- req: - req.peer.FetchNodeData(req.items) + req.peer.FetchNodeData(items) case <-s.cancel: case <-s.d.cancelCh: } @@ -430,7 +430,7 @@ func (s *stateSync) assignTasks() { // fillTasks fills the given request object with a maximum of n state download // tasks to send to the remote peer. -func (s *stateSync) fillTasks(n int, req *stateReq) { +func (s *stateSync) fillTasks(n int, req *stateReq) []common.Hash { // Refill available tasks from the scheduler. if len(s.tasks) < n { new := s.sched.Missing(n - len(s.tasks)) @@ -439,11 +439,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) { } } // Find tasks that haven't been tried with the request's peer. - req.items = make([]common.Hash, 0, n) + items := make([]common.Hash, 0, n) req.tasks = make(map[common.Hash]*stateTask, n) for hash, t := range s.tasks { // Stop when we've gathered enough requests - if len(req.items) == n { + if len(items) == n { break } // Skip any requests we've already tried from this peer @@ -452,10 +452,12 @@ func (s *stateSync) fillTasks(n int, req *stateReq) { } // Assign the request to this peer t.attempts[req.peer.id] = struct{}{} - req.items = append(req.items, hash) + items = append(items, hash) req.tasks[hash] = t delete(s.tasks, hash) } + req.nItems = uint16(len(items)) + return items } // process iterates over a batch of delivered state data, injecting each item diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index bf6bcd2c22..4aa65f0ab4 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -529,39 +529,40 @@ func (f *BlockFetcher) loop() { return } bodyFilterInMeter.Mark(int64(len(task.transactions))) - blocks := []*types.Block{} - for i := 0; i < len(task.blockHashes) && i < len(task.transactions) && i < len(task.randomness) && i < len(task.epochSnarkData); i++ { - // Match up a body to any possible completion request - matched := false - - for hash, announce := range f.completing { - if f.queued[hash] == nil { - if task.blockHashes[i] == announce.header.Hash() && announce.origin == task.peer { - // Mark the body matched, reassemble if still unknown - matched = true - - if f.getBlock(hash) == nil { - block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.randomness[i], task.epochSnarkData[i]) - block.ReceivedAt = task.time - - blocks = append(blocks, block) - } else { - f.forgetHash(hash) - } + // abort early if there's nothing explicitly requested + if len(f.completing) > 0 { + for i := 0; i < len(task.blockHashes) && i < len(task.transactions) && i < len(task.randomness) && i < len(task.epochSnarkData); i++ { + // Match up a body to any possible completion request + var matched = false + for hash, announce := range f.completing { + if f.queued[hash] != nil || announce.origin != task.peer { + continue + } + if task.blockHashes[i] != announce.header.Hash() { + continue + } + // Mark the body matched, reassemble if still unknown + matched = true + if f.getBlock(hash) == nil { + block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.randomness[i], task.epochSnarkData[i]) + block.ReceivedAt = task.time + blocks = append(blocks, block) + } else { + f.forgetHash(hash) } + + } + if matched { + task.blockHashes = append(task.blockHashes[:i], task.blockHashes[i+1:]...) + task.transactions = append(task.transactions[:i], task.transactions[i+1:]...) + task.randomness = append(task.randomness[:i], task.randomness[i+1:]...) + task.epochSnarkData = append(task.epochSnarkData[:i], task.epochSnarkData[i+1:]...) + i-- + continue } - } - if matched { - task.blockHashes = append(task.blockHashes[:i], task.blockHashes[i+1:]...) - task.transactions = append(task.transactions[:i], task.transactions[i+1:]...) - task.randomness = append(task.randomness[:i], task.randomness[i+1:]...) - task.epochSnarkData = append(task.epochSnarkData[:i], task.epochSnarkData[i+1:]...) - i-- - continue } } - bodyFilterOutMeter.Mark(int64(len(task.transactions))) select { case filter <- task: diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 3ecf8d3164..98188cfec6 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -329,6 +329,10 @@ func toBlockNumArg(number *big.Int) string { if number == nil { return "latest" } + pending := big.NewInt(-1) + if number.Cmp(pending) == 0 { + return "pending" + } return hexutil.EncodeBig(number) } diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go index 565ced9548..c592d011ad 100644 --- a/ethclient/ethclient_test.go +++ b/ethclient/ethclient_test.go @@ -97,6 +97,22 @@ func TestToFilterArg(t *testing.T) { }, nil, }, + { + "with negative fromBlock and negative toBlock", + ethereum.FilterQuery{ + Addresses: addresses, + FromBlock: big.NewInt(-1), + ToBlock: big.NewInt(-1), + Topics: [][]common.Hash{}, + }, + map[string]interface{}{ + "address": addresses, + "fromBlock": "pending", + "toBlock": "pending", + "topics": [][]common.Hash{}, + }, + nil, + }, { "with blockhash", ethereum.FilterQuery{ diff --git a/go.mod b/go.mod index 133b39c0e2..d71a1d0782 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277 github.com/hashicorp/golang-lru v0.5.4 github.com/hdevalence/ed25519consensus v0.0.0-20201207055737-7fde80a9d5ff - github.com/holiman/uint256 v1.1.0 + github.com/holiman/uint256 v1.1.1 github.com/huin/goupnp v1.0.0 github.com/influxdata/influxdb v1.2.3-0.20180221223340-01288bdb0883 github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458 diff --git a/go.sum b/go.sum index 77b22bdeca..52b845d70b 100644 --- a/go.sum +++ b/go.sum @@ -113,8 +113,8 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hdevalence/ed25519consensus v0.0.0-20201207055737-7fde80a9d5ff h1:LeVKjw8pcDQj7WVVnbFvbD7ovcv+r/l15ka1NH6Lswc= github.com/hdevalence/ed25519consensus v0.0.0-20201207055737-7fde80a9d5ff/go.mod h1:Feit0l8NcNO4g69XNjwvsR0LGcwMMfzI1TF253rOIlQ= -github.com/holiman/uint256 v1.1.0 h1:Iye6ze0DW9s+7EMn8y6Q4ebegDzpu28JQHEVM1Bq+Wg= -github.com/holiman/uint256 v1.1.0/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= +github.com/holiman/uint256 v1.1.1 h1:4JywC80b+/hSfljFlEBLHrrh+CIONLDz9NuFl0af4Mw= +github.com/holiman/uint256 v1.1.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo= diff --git a/les/api.go b/les/api.go index 66bf8d3b6b..754b82a2d5 100644 --- a/les/api.go +++ b/les/api.go @@ -239,6 +239,18 @@ func (api *PrivateLightServerAPI) SetDefaultParams(params map[string]interface{} return err } +// SetConnectedBias set the connection bias, which is applied to already connected clients +// So that already connected client won't be kicked out very soon and we can ensure all +// connected clients can have enough time to request or sync some data. +// When the input parameter `bias` < 0 (illegal), return error. +func (api *PrivateLightServerAPI) SetConnectedBias(bias time.Duration) error { + if bias < time.Duration(0) { + return fmt.Errorf("bias illegal: %v less than 0", bias) + } + api.server.clientPool.setConnectedBias(bias) + return nil +} + // Benchmark runs a request performance benchmark with a given set of measurement setups // in multiple passes specified by passCount. The measurement time for each setup in each // pass is specified in milliseconds by length. diff --git a/les/clientpool.go b/les/clientpool.go index 9e618687d0..313b127d41 100644 --- a/les/clientpool.go +++ b/les/clientpool.go @@ -42,15 +42,7 @@ const ( persistCumulativeTimeRefresh = time.Minute * 5 // refresh period of the cumulative running time persistence posBalanceCacheLimit = 8192 // the maximum number of cached items in positive balance queue negBalanceCacheLimit = 8192 // the maximum number of cached items in negative balance queue - - // connectedBias is applied to already connected clients So that - // already connected client won't be kicked out very soon and we - // can ensure all connected clients can have enough time to request - // or sync some data. - // - // todo(rjl493456442) make it configurable. It can be the option of - // free trial time! - connectedBias = time.Minute * 3 + defaultConnectedBias = time.Minute * 3 // the default connectedBias used in clientPool ) // clientPool implements a client database that assigns a priority to each client @@ -94,7 +86,7 @@ type clientPool struct { freeClientCap uint64 // The capacity value of each free client startTime mclock.AbsTime // The timestamp at which the clientpool started running cumulativeTime int64 // The cumulative running time of clientpool at the start point. - disableBias bool // Disable connection bias(used in testing) + connectedBias time.Duration // The connection bias. 0: Disable connection bias(used in testing) } // clientPoolPeer represents a client peer in the pool. @@ -171,6 +163,7 @@ func newClientPool(db ethdb.Database, freeClientCap uint64, clock mclock.Clock, startTime: clock.Now(), cumulativeTime: ndb.getCumulativeTime(), stopCh: make(chan struct{}), + connectedBias: defaultConnectedBias, } // If the negative balance of free client is even lower than 1, // delete this entry. @@ -279,11 +272,7 @@ func (f *clientPool) connect(peer clientPoolPeer, capacity uint64) bool { newCount-- return newCapacity > f.capLimit || newCount > f.connLimit }) - bias := connectedBias - if f.disableBias { - bias = 0 - } - if newCapacity > f.capLimit || newCount > f.connLimit || (e.balanceTracker.estimatedPriority(now+mclock.AbsTime(bias), false)-kickPriority) > 0 { + if newCapacity > f.capLimit || newCount > f.connLimit || (e.balanceTracker.estimatedPriority(now+mclock.AbsTime(f.connectedBias), false)-kickPriority) > 0 { for _, c := range kickList { f.connectedQueue.Push(c) } @@ -371,6 +360,16 @@ func (f *clientPool) setDefaultFactors(posFactors, negFactors priceFactors) { f.defaultNegFactors = negFactors } +// setConnectedBias sets the connection bias, which is applied to already connected clients +// So that already connected client won't be kicked out very soon and we can ensure all +// connected clients can have enough time to request or sync some data. +func (f *clientPool) setConnectedBias(bias time.Duration) { + f.lock.Lock() + defer f.lock.Unlock() + + f.connectedBias = bias +} + // dropClient removes a client from the connected queue and finalizes its balance. // If kick is true then it also initiates the disconnection. func (f *clientPool) dropClient(e *clientInfo, now mclock.AbsTime, kick bool) { diff --git a/les/clientpool_test.go b/les/clientpool_test.go index a02b198193..2dae7a3622 100644 --- a/les/clientpool_test.go +++ b/les/clientpool_test.go @@ -91,7 +91,7 @@ func testClientPool(t *testing.T, connLimit, clientCount, paidCount int, randomD } pool = newClientPool(db, 1, &clock, disconnFn) ) - pool.disableBias = true + pool.setConnectedBias(0) pool.setLimits(connLimit, uint64(connLimit)) pool.setDefaultFactors(priceFactors{1, 0, 1}, priceFactors{1, 0, 1}) @@ -248,7 +248,7 @@ func TestPaidClientKickedOut(t *testing.T) { clock.Run(time.Millisecond) } clock.Run(time.Second) - clock.Run(connectedBias) + clock.Run(defaultConnectedBias) if !pool.connect(poolTestPeer(11), 0) { t.Fatalf("Free client should be accectped") } diff --git a/whisper/whisperv6/whisper_test.go b/whisper/whisperv6/whisper_test.go index ca4b12bee1..4c93dae71f 100644 --- a/whisper/whisperv6/whisper_test.go +++ b/whisper/whisperv6/whisper_test.go @@ -42,13 +42,13 @@ func TestWhisperBasic(t *testing.T) { t.Fatalf("failed Protocol Length: %v.", shh.Length) } if shh.Run == nil { - t.Fatalf("failed shh.Run.") + t.Fatal("failed shh.Run.") } if uint64(w.Version()) != ProtocolVersion { t.Fatalf("failed whisper Version: %v.", shh.Version) } if w.GetFilter("non-existent") != nil { - t.Fatalf("failed GetFilter.") + t.Fatal("failed GetFilter.") } peerID := make([]byte, 64) @@ -58,22 +58,22 @@ func TestWhisperBasic(t *testing.T) { t.Fatal("found peer for random key.") } if err := w.AllowP2PMessagesFromPeer(peerID); err == nil { - t.Fatalf("failed MarkPeerTrusted.") + t.Fatal("failed MarkPeerTrusted.") } exist := w.HasSymKey("non-existing") if exist { - t.Fatalf("failed HasSymKey.") + t.Fatal("failed HasSymKey.") } key, err := w.GetSymKey("non-existing") if err == nil { - t.Fatalf("failed GetSymKey(non-existing): false positive.") + t.Fatalf("failed GetSymKey(non-existing): false positive. key=%v", key) } if key != nil { - t.Fatalf("failed GetSymKey: false positive.") + t.Fatalf("failed GetSymKey: false positive. key=%v", key) } mail := w.Envelopes() if len(mail) != 0 { - t.Fatalf("failed w.Envelopes().") + t.Fatalf("failed w.Envelopes(). length=%d", len(mail)) } derived := pbkdf2.Key(peerID, nil, 65356, aesKeyLength, sha256.New) @@ -96,11 +96,11 @@ func TestWhisperBasic(t *testing.T) { id, err := w.NewKeyPair() if err != nil { - t.Fatalf("failed to generate new key pair: %s.", err) + t.Fatalf("failed to generate new key pair: %v.", err) } pk, err := w.GetPrivateKey(id) if err != nil { - t.Fatalf("failed to retrieve new key pair: %s.", err) + t.Fatalf("failed to retrieve new key pair: %v.", err) } if !validatePrivateKey(pk) { t.Fatalf("failed validatePrivateKey: %v.", pk) @@ -130,7 +130,7 @@ func TestWhisperAsymmetricKeyImport(t *testing.T) { privateKeys = append(privateKeys, pk) if !w.DeleteKeyPair(id) { - t.Fatalf("could not delete private key") + t.Fatal("could not delete private key") } } @@ -161,101 +161,101 @@ func TestWhisperIdentityManagement(t *testing.T) { } if !w.HasKeyPair(id1) { - t.Fatalf("failed HasIdentity(pk1).") + t.Fatal("failed HasIdentity(pk1).") } if !w.HasKeyPair(id2) { - t.Fatalf("failed HasIdentity(pk2).") + t.Fatal("failed HasIdentity(pk2).") } if pk1 == nil { - t.Fatalf("failed GetIdentity(pk1).") + t.Fatal("failed GetIdentity(pk1).") } if pk2 == nil { - t.Fatalf("failed GetIdentity(pk2).") + t.Fatal("failed GetIdentity(pk2).") } if !validatePrivateKey(pk1) { - t.Fatalf("pk1 is invalid.") + t.Fatal("pk1 is invalid.") } if !validatePrivateKey(pk2) { - t.Fatalf("pk2 is invalid.") + t.Fatal("pk2 is invalid.") } // Delete one identity done := w.DeleteKeyPair(id1) if !done { - t.Fatalf("failed to delete id1.") + t.Fatal("failed to delete id1.") } pk1, err = w.GetPrivateKey(id1) if err == nil { - t.Fatalf("retrieve the key pair: false positive.") + t.Fatalf("retrieve the key pair: false positive. key=%v", pk1) } pk2, err = w.GetPrivateKey(id2) if err != nil { t.Fatalf("failed to retrieve the key pair: %s.", err) } if w.HasKeyPair(id1) { - t.Fatalf("failed DeleteIdentity(pub1): still exist.") + t.Fatal("failed DeleteIdentity(pub1): still exist.") } if !w.HasKeyPair(id2) { - t.Fatalf("failed DeleteIdentity(pub1): pub2 does not exist.") + t.Fatal("failed DeleteIdentity(pub1): pub2 does not exist.") } if pk1 != nil { - t.Fatalf("failed DeleteIdentity(pub1): first key still exist.") + t.Fatal("failed DeleteIdentity(pub1): first key still exist.") } if pk2 == nil { - t.Fatalf("failed DeleteIdentity(pub1): second key does not exist.") + t.Fatal("failed DeleteIdentity(pub1): second key does not exist.") } // Delete again non-existing identity done = w.DeleteKeyPair(id1) if done { - t.Fatalf("delete id1: false positive.") + t.Fatal("delete id1: false positive.") } pk1, err = w.GetPrivateKey(id1) if err == nil { - t.Fatalf("retrieve the key pair: false positive.") + t.Fatalf("retrieve the key pair: false positive. key=%v", pk1) } pk2, err = w.GetPrivateKey(id2) if err != nil { t.Fatalf("failed to retrieve the key pair: %s.", err) } if w.HasKeyPair(id1) { - t.Fatalf("failed delete non-existing identity: exist.") + t.Fatal("failed delete non-existing identity: exist.") } if !w.HasKeyPair(id2) { - t.Fatalf("failed delete non-existing identity: pub2 does not exist.") + t.Fatal("failed delete non-existing identity: pub2 does not exist.") } if pk1 != nil { - t.Fatalf("failed delete non-existing identity: first key exist.") + t.Fatalf("failed delete non-existing identity: first key exist. key=%v", pk1) } if pk2 == nil { - t.Fatalf("failed delete non-existing identity: second key does not exist.") + t.Fatal("failed delete non-existing identity: second key does not exist.") } // Delete second identity done = w.DeleteKeyPair(id2) if !done { - t.Fatalf("failed to delete id2.") + t.Fatal("failed to delete id2.") } pk1, err = w.GetPrivateKey(id1) if err == nil { - t.Fatalf("retrieve the key pair: false positive.") + t.Fatalf("retrieve the key pair: false positive. key=%v", pk1) } pk2, err = w.GetPrivateKey(id2) if err == nil { - t.Fatalf("retrieve the key pair: false positive.") + t.Fatalf("retrieve the key pair: false positive. key=%v", pk2) } if w.HasKeyPair(id1) { - t.Fatalf("failed delete second identity: first identity exist.") + t.Fatal("failed delete second identity: first identity exist.") } if w.HasKeyPair(id2) { - t.Fatalf("failed delete second identity: still exist.") + t.Fatal("failed delete second identity: still exist.") } if pk1 != nil { - t.Fatalf("failed delete second identity: first key exist.") + t.Fatalf("failed delete second identity: first key exist. key=%v", pk1) } if pk2 != nil { - t.Fatalf("failed delete second identity: second key exist.") + t.Fatalf("failed delete second identity: second key exist. key=%v", pk2) } } @@ -274,23 +274,23 @@ func TestWhisperSymKeyManagement(t *testing.T) { k1, err = w.GetSymKey(id1) if err != nil { - t.Fatalf("failed GetSymKey(id1).") + t.Fatalf("failed GetSymKey(id1). err=%v", err) } k2, err = w.GetSymKey(id2) if err == nil { - t.Fatalf("failed GetSymKey(id2): false positive.") + t.Fatalf("failed GetSymKey(id2): false positive. key=%v", k2) } if !w.HasSymKey(id1) { - t.Fatalf("failed HasSymKey(id1).") + t.Fatal("failed HasSymKey(id1).") } if w.HasSymKey(id2) { - t.Fatalf("failed HasSymKey(id2): false positive.") + t.Fatal("failed HasSymKey(id2): false positive.") } if k1 == nil { - t.Fatalf("first key does not exist.") + t.Fatal("first key does not exist.") } if k2 != nil { - t.Fatalf("second key still exist.") + t.Fatalf("second key still exist. key=%v", k2) } // add existing id, nothing should change @@ -303,26 +303,26 @@ func TestWhisperSymKeyManagement(t *testing.T) { k1, err = w.GetSymKey(id1) if err != nil { - t.Fatalf("failed w.GetSymKey(id1).") + t.Fatalf("failed w.GetSymKey(id1). err=%v", err) } k2, err = w.GetSymKey(id2) if err == nil { - t.Fatalf("failed w.GetSymKey(id2): false positive.") + t.Fatalf("failed w.GetSymKey(id2): false positive. key=%v", k2) } if !w.HasSymKey(id1) { - t.Fatalf("failed w.HasSymKey(id1).") + t.Fatal("failed w.HasSymKey(id1).") } if w.HasSymKey(id2) { - t.Fatalf("failed w.HasSymKey(id2): false positive.") + t.Fatal("failed w.HasSymKey(id2): false positive.") } if k1 == nil { - t.Fatalf("first key does not exist.") + t.Fatal("first key does not exist.") } if !bytes.Equal(k1, randomKey) { - t.Fatalf("k1 != randomKey.") + t.Fatal("k1 != randomKey.") } if k2 != nil { - t.Fatalf("second key already exist.") + t.Fatalf("second key already exist. key=%v", k2) } id2, err = w.AddSymKeyDirect(randomKey) @@ -331,35 +331,35 @@ func TestWhisperSymKeyManagement(t *testing.T) { } k1, err = w.GetSymKey(id1) if err != nil { - t.Fatalf("failed w.GetSymKey(id1).") + t.Fatalf("failed w.GetSymKey(id1). err=%v", err) } k2, err = w.GetSymKey(id2) if err != nil { - t.Fatalf("failed w.GetSymKey(id2).") + t.Fatalf("failed w.GetSymKey(id2). err=%v", err) } if !w.HasSymKey(id1) { - t.Fatalf("HasSymKey(id1) failed.") + t.Fatal("HasSymKey(id1) failed.") } if !w.HasSymKey(id2) { - t.Fatalf("HasSymKey(id2) failed.") + t.Fatal("HasSymKey(id2) failed.") } if k1 == nil { - t.Fatalf("k1 does not exist.") + t.Fatal("k1 does not exist.") } if k2 == nil { - t.Fatalf("k2 does not exist.") + t.Fatal("k2 does not exist.") } if !bytes.Equal(k1, k2) { - t.Fatalf("k1 != k2.") + t.Fatal("k1 != k2.") } if !bytes.Equal(k1, randomKey) { - t.Fatalf("k1 != randomKey.") + t.Fatal("k1 != randomKey.") } if len(k1) != aesKeyLength { - t.Fatalf("wrong length of k1.") + t.Fatalf("wrong length of k1. length=%d", len(k1)) } if len(k2) != aesKeyLength { - t.Fatalf("wrong length of k2.") + t.Fatalf("wrong length of k2. length=%d", len(k2)) } w.DeleteSymKey(id1) @@ -368,49 +368,46 @@ func TestWhisperSymKeyManagement(t *testing.T) { t.Fatalf("failed w.GetSymKey(id1): false positive.") } if k1 != nil { - t.Fatalf("failed GetSymKey(id1): false positive.") + t.Fatalf("failed GetSymKey(id1): false positive. key=%v", k1) } k2, err = w.GetSymKey(id2) if err != nil { - t.Fatalf("failed w.GetSymKey(id2).") + t.Fatalf("failed w.GetSymKey(id2). err=%v", err) } if w.HasSymKey(id1) { - t.Fatalf("failed to delete first key: still exist.") + t.Fatal("failed to delete first key: still exist.") } if !w.HasSymKey(id2) { - t.Fatalf("failed to delete first key: second key does not exist.") - } - if k1 != nil { - t.Fatalf("failed to delete first key.") + t.Fatal("failed to delete first key: second key does not exist.") } if k2 == nil { - t.Fatalf("failed to delete first key: second key is nil.") + t.Fatal("failed to delete first key: second key is nil.") } w.DeleteSymKey(id1) w.DeleteSymKey(id2) k1, err = w.GetSymKey(id1) if err == nil { - t.Fatalf("failed w.GetSymKey(id1): false positive.") + t.Fatalf("failed w.GetSymKey(id1): false positive. key=%v", k1) } k2, err = w.GetSymKey(id2) if err == nil { - t.Fatalf("failed w.GetSymKey(id2): false positive.") + t.Fatalf("failed w.GetSymKey(id2): false positive. key=%v", k2) } if k1 != nil || k2 != nil { - t.Fatalf("k1 or k2 is not nil") + t.Fatal("k1 or k2 is not nil") } if w.HasSymKey(id1) { - t.Fatalf("failed to delete second key: first key exist.") + t.Fatal("failed to delete second key: first key exist.") } if w.HasSymKey(id2) { - t.Fatalf("failed to delete second key: still exist.") + t.Fatal("failed to delete second key: still exist.") } if k1 != nil { - t.Fatalf("failed to delete second key: first key is not nil.") + t.Fatal("failed to delete second key: first key is not nil.") } if k2 != nil { - t.Fatalf("failed to delete second key: second key is not nil.") + t.Fatal("failed to delete second key: second key is not nil.") } randomKey = make([]byte, aesKeyLength+1) @@ -431,23 +428,23 @@ func TestWhisperSymKeyManagement(t *testing.T) { } k1, err = w.GetSymKey(id1) if err != nil { - t.Fatalf("failed w.GetSymKey(id1).") + t.Fatalf("failed w.GetSymKey(id1). err=%v", err) } k2, err = w.GetSymKey(id2) if err != nil { - t.Fatalf("failed w.GetSymKey(id2).") + t.Fatalf("failed w.GetSymKey(id2). err=%v", err) } if !w.HasSymKey(id1) { - t.Fatalf("HasSymKey(id1) failed.") + t.Fatal("HasSymKey(id1) failed.") } if !w.HasSymKey(id2) { - t.Fatalf("HasSymKey(id2) failed.") + t.Fatal("HasSymKey(id2) failed.") } if !validateDataIntegrity(k2, aesKeyLength) { - t.Fatalf("key validation failed.") + t.Fatal("key validation failed.") } if !bytes.Equal(k1, k2) { - t.Fatalf("k1 != k2.") + t.Fatal("k1 != k2.") } } @@ -606,7 +603,7 @@ func TestCustomization(t *testing.T) { <-ticker.C mail := f.Retrieve() if len(mail) > 0 { - t.Fatalf("received premature mail") + t.Fatalf("received premature mail. mail=%v", mail) } } @@ -693,10 +690,10 @@ func TestSymmetricSendCycle(t *testing.T) { mail1 := filter1.Retrieve() mail2 := filter2.Retrieve() if len(mail2) == 0 { - t.Fatalf("did not receive any email for filter 2") + t.Fatal("did not receive any email for filter 2.") } if len(mail1) == 0 { - t.Fatalf("did not receive any email for filter 1") + t.Fatal("did not receive any email for filter 1.") } } @@ -767,7 +764,7 @@ func TestSymmetricSendWithoutAKey(t *testing.T) { <-ticker.C mail := filter.Retrieve() if len(mail) == 0 { - t.Fatalf("did not receive message in spite of not setting a public key") + t.Fatal("did not receive message in spite of not setting a public key") } } @@ -835,7 +832,7 @@ func TestSymmetricSendKeyMismatch(t *testing.T) { <-ticker.C mail := filter.Retrieve() if len(mail) > 0 { - t.Fatalf("received a message when keys weren't matching") + t.Fatalf("received a message when keys weren't matching. message=%v", mail) } } @@ -847,48 +844,48 @@ func TestBloom(t *testing.T) { x[32] = byte(1) x[BloomFilterSize-1] = byte(128) if !BloomFilterMatch(x, b) || !BloomFilterMatch(b, x) { - t.Fatalf("bloom filter does not match the mask") + t.Fatal("bloom filter does not match the mask") } _, err := mrand.Read(b) if err != nil { - t.Fatalf("math rand error") + t.Fatalf("math rand error. err=%v", err) } _, err = mrand.Read(x) if err != nil { - t.Fatalf("math rand error") + t.Fatalf("math rand error. err=%v", err) } if !BloomFilterMatch(b, b) { - t.Fatalf("bloom filter does not match self") + t.Fatal("bloom filter does not match self") } x = addBloom(x, b) if !BloomFilterMatch(x, b) { - t.Fatalf("bloom filter does not match combined bloom") + t.Fatal("bloom filter does not match combined bloom") } if !isFullNode(nil) { - t.Fatalf("isFullNode did not recognize nil as full node") + t.Fatal("isFullNode did not recognize nil as full node") } x[17] = 254 if isFullNode(x) { - t.Fatalf("isFullNode false positive") + t.Fatal("isFullNode false positive") } for i := 0; i < BloomFilterSize; i++ { b[i] = byte(255) } if !isFullNode(b) { - t.Fatalf("isFullNode false negative") + t.Fatal("isFullNode false negative") } if BloomFilterMatch(x, b) { - t.Fatalf("bloomFilterMatch false positive") + t.Fatal("bloomFilterMatch false positive") } if !BloomFilterMatch(b, x) { - t.Fatalf("bloomFilterMatch false negative") + t.Fatal("bloomFilterMatch false negative") } w := New(&DefaultConfig) f := w.BloomFilter() if f != nil { - t.Fatalf("wrong bloom on creation") + t.Fatal("wrong bloom on creation") } err = w.SetBloomFilter(x) if err != nil { @@ -896,6 +893,6 @@ func TestBloom(t *testing.T) { } f = w.BloomFilter() if !BloomFilterMatch(f, x) || !BloomFilterMatch(x, f) { - t.Fatalf("retireved wrong bloom filter") + t.Fatal("retireved wrong bloom filter") } }