diff --git a/VERSION b/VERSION index 53ed4ba51e..267637b120 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.8.9 +1.8.11 diff --git a/build/ci.go b/build/ci.go index 79dcc146c3..9f3ed829c6 100644 --- a/build/ci.go +++ b/build/ci.go @@ -330,6 +330,7 @@ func doLint(cmdline []string) { configs := []string{ "--vendor", "--tests", + "--deadline=10m", "--disable-all", "--enable=goimports", "--enable=varcheck", diff --git a/common/bytes.go b/common/bytes.go index e2adc80059..ff309ca937 100644 --- a/common/bytes.go +++ b/common/bytes.go @@ -17,17 +17,24 @@ // Package common contains various helper functions. package common -import "encoding/hex" +import ( + "encoding/hex" +) +// ToHex returns the hex representation of b, prefixed with '0x'. +// For empty slices, the return value is "0x0". +// +// Deprecated: use hexutil.Encode instead. func ToHex(b []byte) string { hex := Bytes2Hex(b) - // Prefer output of "0x0" instead of "0x" if len(hex) == 0 { hex = "0" } return "0x" + hex } +// FromHex returns the bytes represented by the hexadecimal string s. +// s may be prefixed with "0x". func FromHex(s string) []byte { if len(s) > 1 { if s[0:2] == "0x" || s[0:2] == "0X" { @@ -40,9 +47,7 @@ func FromHex(s string) []byte { return Hex2Bytes(s) } -// Copy bytes -// -// Returns an exact copy of the provided bytes +// CopyBytes returns an exact copy of the provided bytes. func CopyBytes(b []byte) (copiedBytes []byte) { if b == nil { return nil @@ -53,14 +58,17 @@ func CopyBytes(b []byte) (copiedBytes []byte) { return } +// hasHexPrefix validates str begins with '0x' or '0X'. func hasHexPrefix(str string) bool { return len(str) >= 2 && str[0] == '0' && (str[1] == 'x' || str[1] == 'X') } +// isHexCharacter returns bool of c being a valid hexadecimal. func isHexCharacter(c byte) bool { return ('0' <= c && c <= '9') || ('a' <= c && c <= 'f') || ('A' <= c && c <= 'F') } +// isHex validates whether each byte is valid hexadecimal string. func isHex(str string) bool { if len(str)%2 != 0 { return false @@ -73,16 +81,18 @@ func isHex(str string) bool { return true } +// Bytes2Hex returns the hexadecimal encoding of d. func Bytes2Hex(d []byte) string { return hex.EncodeToString(d) } +// Hex2Bytes returns the bytes represented by the hexadecimal string str. func Hex2Bytes(str string) []byte { h, _ := hex.DecodeString(str) - return h } +// Hex2BytesFixed returns bytes of a specified fixed length flen. func Hex2BytesFixed(str string, flen int) []byte { h, _ := hex.DecodeString(str) if len(h) == flen { @@ -96,6 +106,7 @@ func Hex2BytesFixed(str string, flen int) []byte { return hh } +// RightPadBytes zero-pads slice to the right up to length l. func RightPadBytes(slice []byte, l int) []byte { if l <= len(slice) { return slice @@ -107,6 +118,7 @@ func RightPadBytes(slice []byte, l int) []byte { return padded } +// LeftPadBytes zero-pads slice to the left up to length l. func LeftPadBytes(slice []byte, l int) []byte { if l <= len(slice) { return slice diff --git a/common/math/big.go b/common/math/big.go index 7872786503..dbf2770a94 100644 --- a/common/math/big.go +++ b/common/math/big.go @@ -78,7 +78,7 @@ func ParseBig256(s string) (*big.Int, bool) { return bigint, ok } -// MustParseBig parses s as a 256 bit big integer and panics if the string is invalid. +// MustParseBig256 parses s as a 256 bit big integer and panics if the string is invalid. func MustParseBig256(s string) *big.Int { v, ok := ParseBig256(s) if !ok { @@ -186,9 +186,8 @@ func U256(x *big.Int) *big.Int { func S256(x *big.Int) *big.Int { if x.Cmp(tt255) < 0 { return x - } else { - return new(big.Int).Sub(x, tt256) } + return new(big.Int).Sub(x, tt256) } // Exp implements exponentiation by squaring. diff --git a/common/number/int.go b/common/number/int.go index 6dab2436de..5b50669703 100644 --- a/common/number/int.go +++ b/common/number/int.go @@ -34,13 +34,12 @@ func limitUnsigned256(x *Number) *Number { func limitSigned256(x *Number) *Number { if x.num.Cmp(tt255) < 0 { return x - } else { - x.num.Sub(x.num, tt256) - return x } + x.num.Sub(x.num, tt256) + return x } -// Number function +// Initialiser is a Number function type Initialiser func(n int64) *Number // A Number represents a generic integer with a bounding function limiter. Limit is called after each operations @@ -51,65 +50,65 @@ type Number struct { limit func(n *Number) *Number } -// Returns a new initialiser for a new *Number without having to expose certain fields +// NewInitialiser returns a new initialiser for a new *Number without having to expose certain fields func NewInitialiser(limiter func(*Number) *Number) Initialiser { return func(n int64) *Number { return &Number{big.NewInt(n), limiter} } } -// Return a Number with a UNSIGNED limiter up to 256 bits +// Uint256 returns a Number with a UNSIGNED limiter up to 256 bits func Uint256(n int64) *Number { return &Number{big.NewInt(n), limitUnsigned256} } -// Return a Number with a SIGNED limiter up to 256 bits +// Int256 returns Number with a SIGNED limiter up to 256 bits func Int256(n int64) *Number { return &Number{big.NewInt(n), limitSigned256} } -// Returns a Number with a SIGNED unlimited size +// Big returns a Number with a SIGNED unlimited size func Big(n int64) *Number { return &Number{big.NewInt(n), func(x *Number) *Number { return x }} } -// Sets i to sum of x+y +// Add sets i to sum of x+y func (i *Number) Add(x, y *Number) *Number { i.num.Add(x.num, y.num) return i.limit(i) } -// Sets i to difference of x-y +// Sub sets i to difference of x-y func (i *Number) Sub(x, y *Number) *Number { i.num.Sub(x.num, y.num) return i.limit(i) } -// Sets i to product of x*y +// Mul sets i to product of x*y func (i *Number) Mul(x, y *Number) *Number { i.num.Mul(x.num, y.num) return i.limit(i) } -// Sets i to the quotient prodject of x/y +// Div sets i to the quotient prodject of x/y func (i *Number) Div(x, y *Number) *Number { i.num.Div(x.num, y.num) return i.limit(i) } -// Sets i to x % y +// Mod sets i to x % y func (i *Number) Mod(x, y *Number) *Number { i.num.Mod(x.num, y.num) return i.limit(i) } -// Sets i to x << s +// Lsh sets i to x << s func (i *Number) Lsh(x *Number, s uint) *Number { i.num.Lsh(x.num, s) return i.limit(i) } -// Sets i to x^y +// Pow sets i to x^y func (i *Number) Pow(x, y *Number) *Number { i.num.Exp(x.num, y.num, big.NewInt(0)) return i.limit(i) @@ -117,13 +116,13 @@ func (i *Number) Pow(x, y *Number) *Number { // Setters -// Set x to i +// Set sets x to i func (i *Number) Set(x *Number) *Number { i.num.Set(x.num) return i.limit(i) } -// Set x bytes to i +// SetBytes sets x bytes to i func (i *Number) SetBytes(x []byte) *Number { i.num.SetBytes(x) return i.limit(i) @@ -140,12 +139,12 @@ func (i *Number) Cmp(x *Number) int { // Getters -// Returns the string representation of i +// String returns the string representation of i func (i *Number) String() string { return i.num.String() } -// Returns the byte representation of i +// Bytes returns the byte representation of i func (i *Number) Bytes() []byte { return i.num.Bytes() } @@ -160,17 +159,17 @@ func (i *Number) Int64() int64 { return i.num.Int64() } -// Returns the signed version of i +// Int256 returns the signed version of i func (i *Number) Int256() *Number { return Int(0).Set(i) } -// Returns the unsigned version of i +// Uint256 returns the unsigned version of i func (i *Number) Uint256() *Number { return Uint(0).Set(i) } -// Returns the index of the first bit that's set to 1 +// FirstBitSet returns the index of the first bit that's set to 1 func (i *Number) FirstBitSet() int { for j := 0; j < i.num.BitLen(); j++ { if i.num.Bit(j) > 0 { diff --git a/common/path.go b/common/path.go index bd8da86e74..69820cfe5d 100644 --- a/common/path.go +++ b/common/path.go @@ -30,6 +30,7 @@ func MakeName(name, version string) string { return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version()) } +// FileExist checks if a file exists at filePath. func FileExist(filePath string) bool { _, err := os.Stat(filePath) if err != nil && os.IsNotExist(err) { @@ -39,9 +40,10 @@ func FileExist(filePath string) bool { return true } -func AbsolutePath(Datadir string, filename string) string { +// AbsolutePath returns datadir + filename, or filename if it is absolute. +func AbsolutePath(datadir string, filename string) string { if filepath.IsAbs(filename) { return filename } - return filepath.Join(Datadir, filename) + return filepath.Join(datadir, filename) } diff --git a/common/types.go b/common/types.go index 0b94fb2c25..12c26d94bc 100644 --- a/common/types.go +++ b/common/types.go @@ -42,19 +42,30 @@ var ( // Hash represents the 32 byte Keccak256 hash of arbitrary data. type Hash [HashLength]byte +// BytesToHash sets b to hash. +// If b is larger than len(h), b will be cropped from the left. func BytesToHash(b []byte) Hash { var h Hash h.SetBytes(b) return h } + +// BigToHash sets byte representation of b to hash. +// If b is larger than len(h), b will be cropped from the left. func BigToHash(b *big.Int) Hash { return BytesToHash(b.Bytes()) } -func HexToHash(s string) Hash { return BytesToHash(FromHex(s)) } -// Get the string representation of the underlying hash -func (h Hash) Str() string { return string(h[:]) } +// HexToHash sets byte representation of s to hash. +// If b is larger than len(h), b will be cropped from the left. +func HexToHash(s string) Hash { return BytesToHash(FromHex(s)) } + +// Bytes gets the byte representation of the underlying hash. func (h Hash) Bytes() []byte { return h[:] } + +// Big converts a hash to a big integer. func (h Hash) Big() *big.Int { return new(big.Int).SetBytes(h[:]) } -func (h Hash) Hex() string { return hexutil.Encode(h[:]) } + +// Hex converts a hash to a hex string. +func (h Hash) Hex() string { return hexutil.Encode(h[:]) } // TerminalString implements log.TerminalStringer, formatting a string for console // output during logging. @@ -89,7 +100,8 @@ func (h Hash) MarshalText() ([]byte, error) { return hexutil.Bytes(h[:]).MarshalText() } -// Sets the hash to the value of b. If b is larger than len(h), 'b' will be cropped (from the left). +// SetBytes sets the hash to the value of b. +// If b is larger than len(h), b will be cropped from the left. func (h *Hash) SetBytes(b []byte) { if len(b) > len(h) { b = b[len(b)-HashLength:] @@ -98,16 +110,6 @@ func (h *Hash) SetBytes(b []byte) { copy(h[HashLength-len(b):], b) } -// Set string `s` to h. If s is larger than len(h) s will be cropped (from left) to fit. -func (h *Hash) SetString(s string) { h.SetBytes([]byte(s)) } - -// Sets h to other -func (h *Hash) Set(other Hash) { - for i, v := range other { - h[i] = v - } -} - // Generate implements testing/quick.Generator. func (h Hash) Generate(rand *rand.Rand, size int) reflect.Value { m := rand.Intn(len(h)) @@ -117,10 +119,6 @@ func (h Hash) Generate(rand *rand.Rand, size int) reflect.Value { return reflect.ValueOf(h) } -func EmptyHash(h Hash) bool { - return h == Hash{} -} - // UnprefixedHash allows marshaling a Hash without 0x prefix. type UnprefixedHash Hash @@ -139,13 +137,21 @@ func (h UnprefixedHash) MarshalText() ([]byte, error) { // Address represents the 20 byte address of an Ethereum account. type Address [AddressLength]byte +// BytesToAddress returns Address with value b. +// If b is larger than len(h), b will be cropped from the left. func BytesToAddress(b []byte) Address { var a Address a.SetBytes(b) return a } + +// BigToAddress returns Address with byte values of b. +// If b is larger than len(h), b will be cropped from the left. func BigToAddress(b *big.Int) Address { return BytesToAddress(b.Bytes()) } -func HexToAddress(s string) Address { return BytesToAddress(FromHex(s)) } + +// HexToAddress returns Address with byte values of s. +// If s is larger than len(h), s will be cropped from the left. +func HexToAddress(s string) Address { return BytesToAddress(FromHex(s)) } // IsHexAddress verifies whether a string can represent a valid hex-encoded // Ethereum address or not. @@ -156,11 +162,14 @@ func IsHexAddress(s string) bool { return len(s) == 2*AddressLength && isHex(s) } -// Get the string representation of the underlying address -func (a Address) Str() string { return string(a[:]) } +// Bytes gets the string representation of the underlying address. func (a Address) Bytes() []byte { return a[:] } + +// Big converts an address to a big integer. func (a Address) Big() *big.Int { return new(big.Int).SetBytes(a[:]) } -func (a Address) Hash() Hash { return BytesToHash(a[:]) } + +// Hash converts an address to a hash by left-padding it with zeros. +func (a Address) Hash() Hash { return BytesToHash(a[:]) } // Hex returns an EIP55-compliant hex string representation of the address. func (a Address) Hex() string { @@ -184,7 +193,7 @@ func (a Address) Hex() string { return "0x" + string(result) } -// String implements the stringer interface and is used also by the logger. +// String implements fmt.Stringer. func (a Address) String() string { return a.Hex() } @@ -195,7 +204,8 @@ func (a Address) Format(s fmt.State, c rune) { fmt.Fprintf(s, "%"+string(c), a[:]) } -// Sets the address to the value of b. If b is larger than len(a) it will panic +// SetBytes sets the address to the value of b. +// If b is larger than len(a) it will panic. func (a *Address) SetBytes(b []byte) { if len(b) > len(a) { b = b[len(b)-AddressLength:] @@ -203,16 +213,6 @@ func (a *Address) SetBytes(b []byte) { copy(a[AddressLength-len(b):], b) } -// Set string `s` to a. If s is larger than len(a) it will panic -func (a *Address) SetString(s string) { a.SetBytes([]byte(s)) } - -// Sets a to other -func (a *Address) Set(other Address) { - for i, v := range other { - a[i] = v - } -} - // MarshalText returns the hex representation of a. func (a Address) MarshalText() ([]byte, error) { return hexutil.Bytes(a[:]).MarshalText() @@ -228,7 +228,7 @@ func (a *Address) UnmarshalJSON(input []byte) error { return hexutil.UnmarshalFixedJSON(addressT, input, a[:]) } -// UnprefixedHash allows marshaling an Address without 0x prefix. +// UnprefixedAddress allows marshaling an Address without 0x prefix. type UnprefixedAddress Address // UnmarshalText decodes the address from hex. The 0x prefix is optional. diff --git a/common/types_template.go b/common/types_template.go deleted file mode 100644 index 9a8f29977b..0000000000 --- a/common/types_template.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -// +build none -//sed -e 's/_N_/Hash/g' -e 's/_S_/32/g' -e '1d' types_template.go | gofmt -w hash.go - -package common - -import "math/big" - -type _N_ [_S_]byte - -func BytesTo_N_(b []byte) _N_ { - var h _N_ - h.SetBytes(b) - return h -} -func StringTo_N_(s string) _N_ { return BytesTo_N_([]byte(s)) } -func BigTo_N_(b *big.Int) _N_ { return BytesTo_N_(b.Bytes()) } -func HexTo_N_(s string) _N_ { return BytesTo_N_(FromHex(s)) } - -// Don't use the default 'String' method in case we want to overwrite - -// Get the string representation of the underlying hash -func (h _N_) Str() string { return string(h[:]) } -func (h _N_) Bytes() []byte { return h[:] } -func (h _N_) Big() *big.Int { return new(big.Int).SetBytes(h[:]) } -func (h _N_) Hex() string { return "0x" + Bytes2Hex(h[:]) } - -// Sets the hash to the value of b. If b is larger than len(h) it will panic -func (h *_N_) SetBytes(b []byte) { - // Use the right most bytes - if len(b) > len(h) { - b = b[len(b)-_S_:] - } - - // Reverse the loop - for i := len(b) - 1; i >= 0; i-- { - h[_S_-len(b)+i] = b[i] - } -} - -// Set string `s` to h. If s is larger than len(h) it will panic -func (h *_N_) SetString(s string) { h.SetBytes([]byte(s)) } - -// Sets h to other -func (h *_N_) Set(other _N_) { - for i, v := range other { - h[i] = v - } -} diff --git a/contracts/ens/ens.go b/contracts/ens/ens.go index 06045a5cd8..329109b943 100644 --- a/contracts/ens/ens.go +++ b/contracts/ens/ens.go @@ -100,6 +100,11 @@ func ensNode(name string) common.Hash { return crypto.Keccak256Hash(parentNode[:], parentLabel[:]) } +// Suggest exporting ensNode so external code can use it for generating ens namehashes +func EnsNode(name string) common.Hash { + return ensNode(name) +} + func (self *ENS) getResolver(node [32]byte) (*contract.PublicResolverSession, error) { resolverAddr, err := self.Resolver(node) if err != nil { diff --git a/core/state/state_test.go b/core/state/state_test.go index 12778f6f12..123559ea9b 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -99,7 +99,7 @@ func (s *StateSuite) TestNull(c *checker.C) { s.state.SetState(address, common.Hash{}, value) s.state.Commit(false) value = s.state.GetState(address, common.Hash{}) - if !common.EmptyHash(value) { + if value != (common.Hash{}) { c.Errorf("expected empty hash. got %x", value) } } diff --git a/core/state/sync.go b/core/state/sync.go index 28fcf6ae05..c566e79073 100644 --- a/core/state/sync.go +++ b/core/state/sync.go @@ -25,8 +25,8 @@ import ( ) // NewStateSync create a new state trie download scheduler. -func NewStateSync(root common.Hash, database trie.DatabaseReader) *trie.TrieSync { - var syncer *trie.TrieSync +func NewStateSync(root common.Hash, database trie.DatabaseReader) *trie.Sync { + var syncer *trie.Sync callback := func(leaf []byte, parent common.Hash) error { var obj Account if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil { @@ -36,6 +36,6 @@ func NewStateSync(root common.Hash, database trie.DatabaseReader) *trie.TrieSync syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent) return nil } - syncer = trie.NewTrieSync(root, database, callback) + syncer = trie.NewSync(root, database, callback) return syncer } diff --git a/core/tx_pool.go b/core/tx_pool.go index 1c9516b1b7..7393c8286f 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -962,7 +962,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { } // Notify subsystem for new promoted transactions. if len(promoted) > 0 { - pool.txFeed.Send(NewTxsEvent{promoted}) + go pool.txFeed.Send(NewTxsEvent{promoted}) } // If the pending limit is overflown, start equalizing allowances pending := uint64(0) diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index d1861b14cd..b390f45c67 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -165,28 +165,13 @@ func TestTransactionPriceNonceSort(t *testing.T) { t.Errorf("invalid nonce ordering: tx #%d (A=%x N=%v) < tx #%d (A=%x N=%v)", i, fromi[:4], txi.Nonce(), i+j, fromj[:4], txj.Nonce()) } } - // Find the previous and next nonce of this account - prev, next := i-1, i+1 - for j := i - 1; j >= 0; j-- { - if fromj, _ := Sender(signer, txs[j]); fromi == fromj { - prev = j - break - } - } - for j := i + 1; j < len(txs); j++ { - if fromj, _ := Sender(signer, txs[j]); fromi == fromj { - next = j - break - } - } - // Make sure that in between the neighbor nonces, the transaction is correctly positioned price wise - for j := prev + 1; j < next; j++ { - fromj, _ := Sender(signer, txs[j]) - if j < i && txs[j].GasPrice().Cmp(txi.GasPrice()) < 0 { - t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", j, fromj[:4], txs[j].GasPrice(), i, fromi[:4], txi.GasPrice()) - } - if j > i && txs[j].GasPrice().Cmp(txi.GasPrice()) > 0 { - t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) > tx #%d (A=%x P=%v)", j, fromj[:4], txs[j].GasPrice(), i, fromi[:4], txi.GasPrice()) + + // If the next tx has different from account, the price must be lower than the current one + if i+1 < len(txs) { + next := txs[i+1] + fromNext, _ := Sender(signer, next) + if fromi != fromNext && txi.GasPrice().Cmp(next.GasPrice()) < 0 { + t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice()) } } } diff --git a/core/vm/gas_table.go b/core/vm/gas_table.go index 83adba428e..0764c67a4d 100644 --- a/core/vm/gas_table.go +++ b/core/vm/gas_table.go @@ -124,12 +124,12 @@ func gasSStore(gt params.GasTable, evm *EVM, contract *Contract, stack *Stack, m // 1. From a zero-value address to a non-zero value (NEW VALUE) // 2. From a non-zero value address to a zero-value address (DELETE) // 3. From a non-zero to a non-zero (CHANGE) - if common.EmptyHash(val) && !common.EmptyHash(common.BigToHash(y)) { + if val == (common.Hash{}) && y.Sign() != 0 { // 0 => non 0 return params.SstoreSetGas, nil - } else if !common.EmptyHash(val) && common.EmptyHash(common.BigToHash(y)) { + } else if val != (common.Hash{}) && y.Sign() == 0 { + // non 0 => 0 evm.StateDB.AddRefund(params.SstoreRefundGas) - return params.SstoreClearGas, nil } else { // non 0 => non 0 (or 0 => 0) diff --git a/core/vm/jump_table.go b/core/vm/jump_table.go index 3389941353..49a94d9646 100644 --- a/core/vm/jump_table.go +++ b/core/vm/jump_table.go @@ -33,7 +33,7 @@ type ( var errGasUintOverflow = errors.New("gas uint64 overflow") type operation struct { - // op is the operation function + // execute is the operation function execute executionFunc // gasCost is the gas function and returns the gas required for execution gasCost gasFunc diff --git a/eth/backend.go b/eth/backend.go index ea70e3826c..e07d5efc9a 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -215,14 +215,14 @@ func CreateConsensusEngine(ctx *node.ServiceContext, config *ethash.Config, chai return clique.New(chainConfig.Clique, db) } // Otherwise assume proof-of-work - switch { - case config.PowMode == ethash.ModeFake: + switch config.PowMode { + case ethash.ModeFake: log.Warn("Ethash used in fake mode") return ethash.NewFaker() - case config.PowMode == ethash.ModeTest: + case ethash.ModeTest: log.Warn("Ethash used in test mode") return ethash.NewTester() - case config.PowMode == ethash.ModeShared: + case ethash.ModeShared: log.Warn("Ethash used in shared mode") return ethash.NewShared() default: @@ -239,7 +239,7 @@ func CreateConsensusEngine(ctx *node.ServiceContext, config *ethash.Config, chai } } -// APIs returns the collection of RPC services the ethereum package offers. +// APIs return the collection of RPC services the ethereum package offers. // NOTE, some of these services probably need to be moved to somewhere else. func (s *Ethereum) APIs() []rpc.API { apis := ethapi.GetAPIs(s.APIBackend) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index dc23354929..51c5936015 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -680,7 +680,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err } } // If the head fetch already found an ancestor, return - if !common.EmptyHash(hash) { + if hash != (common.Hash{}) { if int64(number) <= floor { p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor) return 0, errInvalidAncestor diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index 5b4b9ba1b7..8d33dfec74 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -214,7 +214,7 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { type stateSync struct { d *Downloader // Downloader instance to access and manage current peerset - sched *trie.TrieSync // State trie sync scheduler defining the tasks + sched *trie.Sync // State trie sync scheduler defining the tasks keccak hash.Hash // Keccak256 hasher to verify deliveries with tasks map[common.Hash]*stateTask // Set of tasks currently queued for retrieval diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 0c679cec3a..a2e7cdecf9 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -292,20 +292,20 @@ func (f *Fetcher) loop() { height := f.chainHeight() for !f.queue.Empty() { op := f.queue.PopItem().(*inject) + hash := op.block.Hash() if f.queueChangeHook != nil { - f.queueChangeHook(op.block.Hash(), false) + f.queueChangeHook(hash, false) } // If too high up the chain or phase, continue later number := op.block.NumberU64() if number > height+1 { - f.queue.Push(op, -float32(op.block.NumberU64())) + f.queue.Push(op, -float32(number)) if f.queueChangeHook != nil { - f.queueChangeHook(op.block.Hash(), true) + f.queueChangeHook(hash, true) } break } // Otherwise if fresh and still unknown, try and import - hash := op.block.Hash() if number+maxUncleDist < height || f.getBlock(hash) != nil { f.forgetBlock(hash) continue diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index c116014351..a4d35044cd 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -362,7 +362,7 @@ type nodeInfo struct { // authMsg is the authentication infos needed to login to a monitoring server. type authMsg struct { - Id string `json:"id"` + ID string `json:"id"` Info nodeInfo `json:"info"` Secret string `json:"secret"` } @@ -381,7 +381,7 @@ func (s *Service) login(conn *websocket.Conn) error { protocol = fmt.Sprintf("les/%d", les.ClientProtocolVersions[0]) } auth := &authMsg{ - Id: s.node, + ID: s.node, Info: nodeInfo{ Name: s.node, Node: infos.Name, diff --git a/interfaces.go b/interfaces.go index 1ae1eba48a..a8b48c93d7 100644 --- a/interfaces.go +++ b/interfaces.go @@ -144,7 +144,7 @@ type FilterQuery struct { // {} or nil matches any topic list // {{A}} matches topic A in first position // {{}, {B}} matches any topic in first position, B in second position - // {{A}}, {B}} matches topic A in first position, B in second position + // {{A}, {B}} matches topic A in first position, B in second position // {{A, B}}, {C, D}} matches topic (A OR B) in first position, (C OR D) in second position Topics [][]common.Hash } diff --git a/log/format.go b/log/format.go index fb1ea1a7b4..bed32bd2dc 100644 --- a/log/format.go +++ b/log/format.go @@ -15,7 +15,7 @@ import ( const ( timeFormat = "2006-01-02T15:04:05-0700" - termTimeFormat = "01-02|15:04:05" + termTimeFormat = "01-02|15:04:05.999999" floatFormat = 'f' termMsgJust = 40 ) diff --git a/log/logger.go b/log/logger.go index a2fe6dc580..438aa548fa 100644 --- a/log/logger.go +++ b/log/logger.go @@ -12,6 +12,7 @@ const timeKey = "t" const lvlKey = "lvl" const msgKey = "msg" const errorKey = "LOG15_ERROR" +const skipLevel = 2 type Lvl int @@ -127,13 +128,13 @@ type logger struct { h *swapHandler } -func (l *logger) write(msg string, lvl Lvl, ctx []interface{}) { +func (l *logger) write(msg string, lvl Lvl, ctx []interface{}, skip int) { l.h.Log(&Record{ Time: time.Now(), Lvl: lvl, Msg: msg, Ctx: newContext(l.ctx, ctx), - Call: stack.Caller(2), + Call: stack.Caller(skip), KeyNames: RecordKeyNames{ Time: timeKey, Msg: msgKey, @@ -157,27 +158,27 @@ func newContext(prefix []interface{}, suffix []interface{}) []interface{} { } func (l *logger) Trace(msg string, ctx ...interface{}) { - l.write(msg, LvlTrace, ctx) + l.write(msg, LvlTrace, ctx, skipLevel) } func (l *logger) Debug(msg string, ctx ...interface{}) { - l.write(msg, LvlDebug, ctx) + l.write(msg, LvlDebug, ctx, skipLevel) } func (l *logger) Info(msg string, ctx ...interface{}) { - l.write(msg, LvlInfo, ctx) + l.write(msg, LvlInfo, ctx, skipLevel) } func (l *logger) Warn(msg string, ctx ...interface{}) { - l.write(msg, LvlWarn, ctx) + l.write(msg, LvlWarn, ctx, skipLevel) } func (l *logger) Error(msg string, ctx ...interface{}) { - l.write(msg, LvlError, ctx) + l.write(msg, LvlError, ctx, skipLevel) } func (l *logger) Crit(msg string, ctx ...interface{}) { - l.write(msg, LvlCrit, ctx) + l.write(msg, LvlCrit, ctx, skipLevel) os.Exit(1) } diff --git a/log/root.go b/log/root.go index 71b8cef6d4..0682316389 100644 --- a/log/root.go +++ b/log/root.go @@ -31,31 +31,36 @@ func Root() Logger { // Trace is a convenient alias for Root().Trace func Trace(msg string, ctx ...interface{}) { - root.write(msg, LvlTrace, ctx) + root.write(msg, LvlTrace, ctx, skipLevel) } // Debug is a convenient alias for Root().Debug func Debug(msg string, ctx ...interface{}) { - root.write(msg, LvlDebug, ctx) + root.write(msg, LvlDebug, ctx, skipLevel) } // Info is a convenient alias for Root().Info func Info(msg string, ctx ...interface{}) { - root.write(msg, LvlInfo, ctx) + root.write(msg, LvlInfo, ctx, skipLevel) } // Warn is a convenient alias for Root().Warn func Warn(msg string, ctx ...interface{}) { - root.write(msg, LvlWarn, ctx) + root.write(msg, LvlWarn, ctx, skipLevel) } // Error is a convenient alias for Root().Error func Error(msg string, ctx ...interface{}) { - root.write(msg, LvlError, ctx) + root.write(msg, LvlError, ctx, skipLevel) } // Crit is a convenient alias for Root().Crit func Crit(msg string, ctx ...interface{}) { - root.write(msg, LvlCrit, ctx) + root.write(msg, LvlCrit, ctx, skipLevel) os.Exit(1) } + +// Output is a convenient alias for write +func Output(msg string, lvl Lvl, skip int, ctx ...interface{}) { + root.write(msg, lvl, ctx, skip) +} diff --git a/metrics/timer_test.go b/metrics/timer_test.go index c1f0ff9388..8638a2270b 100644 --- a/metrics/timer_test.go +++ b/metrics/timer_test.go @@ -47,8 +47,8 @@ func TestTimerStop(t *testing.T) { func TestTimerFunc(t *testing.T) { tm := NewTimer() tm.Time(func() { time.Sleep(50e6) }) - if max := tm.Max(); 35e6 > max || max > 95e6 { - t.Errorf("tm.Max(): 35e6 > %v || %v > 95e6\n", max, max) + if max := tm.Max(); 35e6 > max || max > 145e6 { + t.Errorf("tm.Max(): 35e6 > %v || %v > 145e6\n", max, max) } } diff --git a/node/doc.go b/node/doc.go index d9688e0a12..41a88c19a1 100644 --- a/node/doc.go +++ b/node/doc.go @@ -59,7 +59,7 @@ using the same data directory will store this information in different subdirect the data directory. LevelDB databases are also stored within the instance subdirectory. If multiple node -instances use the same data directory, openening the databases with identical names will +instances use the same data directory, opening the databases with identical names will create one database for each instance. The account key store is shared among all node instances using the same data directory @@ -84,7 +84,7 @@ directory. Mode instance A opens the database "db", node instance B opens the da static-nodes.json -- devp2p static node list of instance B db/ -- LevelDB content for "db" db-2/ -- LevelDB content for "db-2" - B.ipc -- JSON-RPC UNIX domain socket endpoint of instance A + B.ipc -- JSON-RPC UNIX domain socket endpoint of instance B keystore/ -- account key store, used by both instances */ package node diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 6509326e69..18920ccfdd 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -480,16 +480,16 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { b := tab.buckets[bi] if err == nil { // The node responded, move it to the front. - log.Debug("Revalidated node", "b", bi, "id", last.ID) + log.Trace("Revalidated node", "b", bi, "id", last.ID) b.bump(last) return } // No reply received, pick a replacement or delete the node if there aren't // any replacements. if r := tab.replace(b, last); r != nil { - log.Debug("Replaced dead node", "b", bi, "id", last.ID, "ip", last.IP, "r", r.ID, "rip", r.IP) + log.Trace("Replaced dead node", "b", bi, "id", last.ID, "ip", last.IP, "r", r.ID, "rip", r.IP) } else { - log.Debug("Removed dead node", "b", bi, "id", last.ID, "ip", last.IP) + log.Trace("Removed dead node", "b", bi, "id", last.ID, "ip", last.IP) } } diff --git a/p2p/discv5/metrics.go b/p2p/discv5/metrics.go new file mode 100644 index 0000000000..cb11d7eacf --- /dev/null +++ b/p2p/discv5/metrics.go @@ -0,0 +1,8 @@ +package discv5 + +import "github.com/ethereum/go-ethereum/metrics" + +var ( + ingressTrafficMeter = metrics.NewRegisteredMeter("discv5/InboundTraffic", nil) + egressTrafficMeter = metrics.NewRegisteredMeter("discv5/OutboundTraffic", nil) +) diff --git a/p2p/discv5/udp.go b/p2p/discv5/udp.go index 09e5f8b374..49e1cb811a 100644 --- a/p2p/discv5/udp.go +++ b/p2p/discv5/udp.go @@ -334,8 +334,10 @@ func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req inter return hash, err } log.Trace(fmt.Sprintf(">>> %v to %x@%v", nodeEvent(ptype), toid[:8], toaddr)) - if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil { + if nbytes, err := t.conn.WriteToUDP(packet, toaddr); err != nil { log.Trace(fmt.Sprint("UDP send failed:", err)) + } else { + egressTrafficMeter.Mark(int64(nbytes)) } //fmt.Println(err) return hash, err @@ -374,6 +376,7 @@ func (t *udp) readLoop() { buf := make([]byte, 1280) for { nbytes, from, err := t.conn.ReadFromUDP(buf) + ingressTrafficMeter.Mark(int64(nbytes)) if netutil.IsTemporaryError(err) { // Ignore temporary read errors. log.Debug(fmt.Sprintf("Temporary read error: %v", err)) diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go index 9914c99587..849a7ef399 100644 --- a/p2p/protocols/protocol.go +++ b/p2p/protocols/protocol.go @@ -33,7 +33,9 @@ import ( "fmt" "reflect" "sync" + "time" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" ) @@ -217,6 +219,8 @@ func (p *Peer) Drop(err error) { // this low level call will be wrapped by libraries providing routed or broadcast sends // but often just used to forward and push messages to directly connected peers func (p *Peer) Send(msg interface{}) error { + defer metrics.GetOrRegisterResettingTimer("peer.send_t", nil).UpdateSince(time.Now()) + metrics.GetOrRegisterCounter("peer.send", nil).Inc(1) code, found := p.spec.GetCode(msg) if !found { return errorf(ErrInvalidMsgType, "%v", code) diff --git a/p2p/protocols/protocol_test.go b/p2p/protocols/protocol_test.go index 053f537a62..aaae7502b5 100644 --- a/p2p/protocols/protocol_test.go +++ b/p2p/protocols/protocol_test.go @@ -373,15 +373,14 @@ WAIT: } } - -func TestMultiplePeersDropSelf(t *testing.T) { +func XTestMultiplePeersDropSelf(t *testing.T) { runMultiplePeers(t, 0, fmt.Errorf("subprotocol error"), fmt.Errorf("Message handler error: (msg code 3): dropped"), ) } -func TestMultiplePeersDropOther(t *testing.T) { +func XTestMultiplePeersDropOther(t *testing.T) { runMultiplePeers(t, 1, fmt.Errorf("Message handler error: (msg code 3): dropped"), fmt.Errorf("subprotocol error"), diff --git a/p2p/rlpx_test.go b/p2p/rlpx_test.go index bca4604021..7ae8007740 100644 --- a/p2p/rlpx_test.go +++ b/p2p/rlpx_test.go @@ -35,6 +35,7 @@ import ( "github.com/ethereum/go-ethereum/crypto/ecies" "github.com/ethereum/go-ethereum/crypto/sha3" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/pipes" "github.com/ethereum/go-ethereum/rlp" ) @@ -159,7 +160,7 @@ func TestProtocolHandshake(t *testing.T) { wg sync.WaitGroup ) - fd0, fd1, err := tcpPipe() + fd0, fd1, err := pipes.TCPPipe() if err != nil { t.Fatal(err) } @@ -601,31 +602,3 @@ func TestHandshakeForwardCompatibility(t *testing.T) { t.Errorf("ingress-mac('foo') mismatch:\ngot %x\nwant %x", fooIngressHash, wantFooIngressHash) } } - -// tcpPipe creates an in process full duplex pipe based on a localhost TCP socket -func tcpPipe() (net.Conn, net.Conn, error) { - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return nil, nil, err - } - defer l.Close() - - var aconn net.Conn - aerr := make(chan error, 1) - go func() { - var err error - aconn, err = l.Accept() - aerr <- err - }() - - dconn, err := net.Dial("tcp", l.Addr().String()) - if err != nil { - <-aerr - return nil, nil, err - } - if err := <-aerr; err != nil { - dconn.Close() - return nil, nil, err - } - return aconn, dconn, nil -} diff --git a/p2p/server.go b/p2p/server.go index c41d1dc156..cdb5b1926e 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -594,13 +594,13 @@ running: // This channel is used by AddPeer to add to the // ephemeral static peer list. Add it to the dialer, // it will keep the node connected. - srv.log.Debug("Adding static node", "node", n) + srv.log.Trace("Adding static node", "node", n) dialstate.addStatic(n) case n := <-srv.removestatic: // This channel is used by RemovePeer to send a // disconnect request to a peer and begin the // stop keeping the node connected - srv.log.Debug("Removing static node", "node", n) + srv.log.Trace("Removing static node", "node", n) dialstate.removeStatic(n) if p, ok := peers[n.ID]; ok { p.Disconnect(DiscRequested) diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go index 8ef5629fb5..d145c46b3a 100644 --- a/p2p/simulations/adapters/docker.go +++ b/p2p/simulations/adapters/docker.go @@ -28,11 +28,14 @@ import ( "strings" "github.com/docker/docker/pkg/reexec" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/discover" ) +var ( + ErrLinuxOnly = errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)") +) + // DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker // containers. // @@ -52,7 +55,7 @@ func NewDockerAdapter() (*DockerAdapter, error) { // It is reasonable to require this because the caller can just // compile the current binary in a Docker container. if runtime.GOOS != "linux" { - return nil, errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)") + return nil, ErrLinuxOnly } if err := buildDockerImage(); err != nil { @@ -95,7 +98,10 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) { conf.Stack.P2P.NoDiscovery = true conf.Stack.P2P.NAT = nil conf.Stack.NoUSB = true - conf.Stack.Logger = log.New("node.id", config.ID.String()) + + // listen on all interfaces on a given port, which we set when we + // initialise NodeConfig (usually a random port) + conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port) node := &DockerNode{ ExecNode: ExecNode{ diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index f381c11596..e64cebc2a7 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -17,6 +17,7 @@ package adapters import ( + "bufio" "context" "crypto/ecdsa" "encoding/json" @@ -103,9 +104,9 @@ func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) { conf.Stack.P2P.NAT = nil conf.Stack.NoUSB = true - // listen on a random localhost port (we'll get the actual port after - // starting the node through the RPC admin.nodeInfo method) - conf.Stack.P2P.ListenAddr = "127.0.0.1:0" + // listen on a localhost port, which we set when we + // initialise NodeConfig (usually a random port) + conf.Stack.P2P.ListenAddr = fmt.Sprintf(":%d", config.Port) node := &ExecNode{ ID: config.ID, @@ -190,9 +191,23 @@ func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { n.Cmd = cmd // read the WebSocket address from the stderr logs - wsAddr, err := findWSAddr(stderrR, 10*time.Second) - if err != nil { - return fmt.Errorf("error getting WebSocket address: %s", err) + var wsAddr string + wsAddrC := make(chan string) + go func() { + s := bufio.NewScanner(stderrR) + for s.Scan() { + if strings.Contains(s.Text(), "WebSocket endpoint opened") { + wsAddrC <- wsAddrPattern.FindString(s.Text()) + } + } + }() + select { + case wsAddr = <-wsAddrC: + if wsAddr == "" { + return errors.New("failed to read WebSocket address from stderr") + } + case <-time.After(10 * time.Second): + return errors.New("timed out waiting for WebSocket address on stderr") } // create the RPC client and load the node info @@ -318,6 +333,21 @@ type execNodeConfig struct { PeerAddrs map[string]string `json:"peer_addrs,omitempty"` } +// ExternalIP gets an external IP address so that Enode URL is usable +func ExternalIP() net.IP { + addrs, err := net.InterfaceAddrs() + if err != nil { + log.Crit("error getting IP address", "err", err) + } + for _, addr := range addrs { + if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() && !ip.IP.IsLinkLocalUnicast() { + return ip.IP + } + } + log.Warn("unable to determine explicit IP address, falling back to loopback") + return net.IP{127, 0, 0, 1} +} + // execP2PNode starts a devp2p node when the current binary is executed with // argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2] // and the node config from the _P2P_NODE_CONFIG environment variable @@ -341,25 +371,11 @@ func execP2PNode() { conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey conf.Stack.Logger = log.New("node.id", conf.Node.ID.String()) - // use explicit IP address in ListenAddr so that Enode URL is usable - externalIP := func() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - log.Crit("error getting IP address", "err", err) - } - for _, addr := range addrs { - if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() { - return ip.IP.String() - } - } - log.Crit("unable to determine explicit IP address") - return "" - } if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") { - conf.Stack.P2P.ListenAddr = externalIP() + conf.Stack.P2P.ListenAddr + conf.Stack.P2P.ListenAddr = ExternalIP().String() + conf.Stack.P2P.ListenAddr } if conf.Stack.WSHost == "0.0.0.0" { - conf.Stack.WSHost = externalIP() + conf.Stack.WSHost = ExternalIP().String() } // initialize the devp2p stack diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index 6d90b4a9fc..99904c8766 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -28,12 +28,14 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/pipes" "github.com/ethereum/go-ethereum/rpc" ) // SimAdapter is a NodeAdapter which creates in-memory simulation nodes and -// connects them using in-memory net.Pipe connections +// connects them using net.Pipe or OS socket connections type SimAdapter struct { + pipe func() (net.Conn, net.Conn, error) mtx sync.RWMutex nodes map[discover.NodeID]*SimNode services map[string]ServiceFunc @@ -42,8 +44,30 @@ type SimAdapter struct { // NewSimAdapter creates a SimAdapter which is capable of running in-memory // simulation nodes running any of the given services (the services to run on a // particular node are passed to the NewNode function in the NodeConfig) +// the adapter uses a net.Pipe for in-memory simulated network connections func NewSimAdapter(services map[string]ServiceFunc) *SimAdapter { return &SimAdapter{ + pipe: pipes.NetPipe, + nodes: make(map[discover.NodeID]*SimNode), + services: services, + } +} + +// NewSocketAdapter creates a SimAdapter which is capable of running in-memory +// simulation nodes running any of the given services (the services to run on a +// particular node are passed to the NewNode function in the NodeConfig) +// the adapter uses a OS socketpairs for in-memory simulated network connections +func NewSocketAdapter(services map[string]ServiceFunc) *SimAdapter { + return &SimAdapter{ + pipe: pipes.SocketPipe, + nodes: make(map[discover.NodeID]*SimNode), + services: services, + } +} + +func NewTCPAdapter(services map[string]ServiceFunc) *SimAdapter { + return &SimAdapter{ + pipe: pipes.TCPPipe, nodes: make(map[discover.NodeID]*SimNode), services: services, } @@ -81,7 +105,7 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { MaxPeers: math.MaxInt32, NoDiscovery: true, Dialer: s, - EnableMsgEvents: true, + EnableMsgEvents: config.EnableMsgEvents, }, NoUSB: true, Logger: log.New("node.id", id.String()), @@ -102,7 +126,7 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { } // Dial implements the p2p.NodeDialer interface by connecting to the node using -// an in-memory net.Pipe connection +// an in-memory net.Pipe or OS socket connection func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) { node, ok := s.GetNode(dest.ID) if !ok { @@ -112,7 +136,14 @@ func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) { if srv == nil { return nil, fmt.Errorf("node not running: %s", dest.ID) } - pipe1, pipe2 := net.Pipe() + // SimAdapter.pipe is either net.Pipe (NewSimAdapter) or socketPipe (NewSocketAdapter) + pipe1, pipe2, err := s.pipe() + if err != nil { + return nil, err + } + // this is simulated 'listening' + // asynchronously call the dialed destintion node's p2p server + // to set up connection on the 'listening' side go srv.SetupConn(pipe1, 0, nil) return pipe2, nil } @@ -140,7 +171,7 @@ func (s *SimAdapter) GetNode(id discover.NodeID) (*SimNode, bool) { } // SimNode is an in-memory simulation node which connects to other nodes using -// an in-memory net.Pipe connection (see SimAdapter.Dial), running devp2p +// net.Pipe or OS socket connection (see SimAdapter.Dial), running devp2p // protocols directly over that pipe type SimNode struct { lock sync.RWMutex @@ -241,7 +272,7 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error { for _, name := range sn.config.Services { if err := sn.node.Register(newService(name)); err != nil { regErr = err - return + break } } }) @@ -314,3 +345,18 @@ func (sn *SimNode) NodeInfo() *p2p.NodeInfo { } return server.NodeInfo() } + +func setSocketBuffer(conn net.Conn, socketReadBuffer int, socketWriteBuffer int) error { + switch v := conn.(type) { + case *net.UnixConn: + err := v.SetReadBuffer(socketReadBuffer) + if err != nil { + return err + } + err = v.SetWriteBuffer(socketWriteBuffer) + if err != nil { + return err + } + } + return nil +} diff --git a/p2p/simulations/adapters/inproc_test.go b/p2p/simulations/adapters/inproc_test.go new file mode 100644 index 0000000000..3cc3f5434a --- /dev/null +++ b/p2p/simulations/adapters/inproc_test.go @@ -0,0 +1,372 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package adapters + +import ( + "bytes" + "encoding/binary" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/simulations/pipes" +) + +func TestSocketPipe(t *testing.T) { + c1, c2, err := pipes.SocketPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 20 + size := 8 + + // OS socket pipe is blocking (depending on buffer size on OS), so writes are emitted asynchronously + go func() { + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + }() + + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(msg, out) { + t.Fatalf("expected %#v, got %#v", msg, out) + } + } + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} + +func TestSocketPipeBidirections(t *testing.T) { + c1, c2, err := pipes.SocketPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 100 + size := 4 + + // OS socket pipe is blocking (depending on buffer size on OS), so writes are emitted asynchronously + go func() { + for i := 0; i < msgs; i++ { + msg := []byte(`ping`) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + }() + + for i := 0; i < msgs; i++ { + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if bytes.Equal(out, []byte(`ping`)) { + msg := []byte(`pong`) + _, err := c2.Write(msg) + if err != nil { + t.Fatal(err) + } + } + } + + for i := 0; i < msgs; i++ { + expected := []byte(`pong`) + + out := make([]byte, size) + _, err := c1.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(out, expected) { + t.Fatalf("expected %#v, got %#v", expected, out) + } + } + + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} + +func TestTcpPipe(t *testing.T) { + c1, c2, err := pipes.TCPPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 50 + size := 1024 + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(msg, out) { + t.Fatalf("expected %#v, got %#v", msg, out) + } + } + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} + +func TestTcpPipeBidirections(t *testing.T) { + c1, c2, err := pipes.TCPPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 50 + size := 7 + for i := 0; i < msgs; i++ { + msg := []byte(fmt.Sprintf("ping %02d", i)) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf("ping %02d", i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", out, expected) + } else { + msg := []byte(fmt.Sprintf("pong %02d", i)) + _, err := c2.Write(msg) + if err != nil { + t.Fatal(err) + } + } + } + + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf("pong %02d", i)) + + out := make([]byte, size) + _, err := c1.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", out, expected) + } + } + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} + +func TestNetPipe(t *testing.T) { + c1, c2, err := pipes.NetPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 50 + size := 1024 + // netPipe is blocking, so writes are emitted asynchronously + go func() { + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + }() + + for i := 0; i < msgs; i++ { + msg := make([]byte, size) + _ = binary.PutUvarint(msg, uint64(i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(msg, out) { + t.Fatalf("expected %#v, got %#v", msg, out) + } + } + + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} + +func TestNetPipeBidirections(t *testing.T) { + c1, c2, err := pipes.NetPipe() + if err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + + go func() { + msgs := 1000 + size := 8 + pingTemplate := "ping %03d" + pongTemplate := "pong %03d" + + // netPipe is blocking, so writes are emitted asynchronously + go func() { + for i := 0; i < msgs; i++ { + msg := []byte(fmt.Sprintf(pingTemplate, i)) + + _, err := c1.Write(msg) + if err != nil { + t.Fatal(err) + } + } + }() + + // netPipe is blocking, so reads for pong are emitted asynchronously + go func() { + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf(pongTemplate, i)) + + out := make([]byte, size) + _, err := c1.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", expected, out) + } + } + + done <- struct{}{} + }() + + // expect to read pings, and respond with pongs to the alternate connection + for i := 0; i < msgs; i++ { + expected := []byte(fmt.Sprintf(pingTemplate, i)) + + out := make([]byte, size) + _, err := c2.Read(out) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(expected, out) { + t.Fatalf("expected %#v, got %#v", expected, out) + } else { + msg := []byte(fmt.Sprintf(pongTemplate, i)) + + _, err := c2.Write(msg) + if err != nil { + t.Fatal(err) + } + } + } + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("test timeout") + } +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index 5b4b47fe2f..2c4b9dd8f2 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -23,6 +23,7 @@ import ( "fmt" "net" "os" + "strconv" "github.com/docker/docker/pkg/reexec" "github.com/ethereum/go-ethereum/crypto" @@ -97,24 +98,30 @@ type NodeConfig struct { // function to sanction or prevent suggesting a peer Reachable func(id discover.NodeID) bool + + Port uint16 } // nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding // all fields as strings type nodeConfigJSON struct { - ID string `json:"id"` - PrivateKey string `json:"private_key"` - Name string `json:"name"` - Services []string `json:"services"` + ID string `json:"id"` + PrivateKey string `json:"private_key"` + Name string `json:"name"` + Services []string `json:"services"` + EnableMsgEvents bool `json:"enable_msg_events"` + Port uint16 `json:"port"` } // MarshalJSON implements the json.Marshaler interface by encoding the config // fields as strings func (n *NodeConfig) MarshalJSON() ([]byte, error) { confJSON := nodeConfigJSON{ - ID: n.ID.String(), - Name: n.Name, - Services: n.Services, + ID: n.ID.String(), + Name: n.Name, + Services: n.Services, + Port: n.Port, + EnableMsgEvents: n.EnableMsgEvents, } if n.PrivateKey != nil { confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) @@ -152,6 +159,8 @@ func (n *NodeConfig) UnmarshalJSON(data []byte) error { n.Name = confJSON.Name n.Services = confJSON.Services + n.Port = confJSON.Port + n.EnableMsgEvents = confJSON.EnableMsgEvents return nil } @@ -163,13 +172,36 @@ func RandomNodeConfig() *NodeConfig { if err != nil { panic("unable to generate key") } - var id discover.NodeID - pubkey := crypto.FromECDSAPub(&key.PublicKey) - copy(id[:], pubkey[1:]) + + id := discover.PubkeyID(&key.PublicKey) + port, err := assignTCPPort() + if err != nil { + panic("unable to assign tcp port") + } return &NodeConfig{ - ID: id, - PrivateKey: key, + ID: id, + Name: fmt.Sprintf("node_%s", id.String()), + PrivateKey: key, + Port: port, + EnableMsgEvents: true, + } +} + +func assignTCPPort() (uint16, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + l.Close() + _, port, err := net.SplitHostPort(l.Addr().String()) + if err != nil { + return 0, err + } + p, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return 0, err } + return uint16(p), nil } // ServiceContext is a collection of options and methods which can be utilised diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 97dd742e88..24001f1949 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -561,7 +561,8 @@ func (s *Server) LoadSnapshot(w http.ResponseWriter, req *http.Request) { // CreateNode creates a node in the network using the given configuration func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { - config := adapters.RandomNodeConfig() + config := &adapters.NodeConfig{} + err := json.NewDecoder(req.Body).Decode(config) if err != nil && err != io.EOF { http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go index 677a8fb147..732d49f546 100644 --- a/p2p/simulations/http_test.go +++ b/p2p/simulations/http_test.go @@ -348,7 +348,8 @@ func startTestNetwork(t *testing.T, client *Client) []string { nodeCount := 2 nodeIDs := make([]string, nodeCount) for i := 0; i < nodeCount; i++ { - node, err := client.CreateNode(nil) + config := adapters.RandomNodeConfig() + node, err := client.CreateNode(config) if err != nil { t.Fatalf("error creating node: %s", err) } @@ -527,7 +528,9 @@ func TestHTTPNodeRPC(t *testing.T) { // start a node in the network client := NewClient(s.URL) - node, err := client.CreateNode(nil) + + config := adapters.RandomNodeConfig() + node, err := client.CreateNode(config) if err != nil { t.Fatalf("error creating node: %s", err) } @@ -589,7 +592,8 @@ func TestHTTPSnapshot(t *testing.T) { nodeCount := 2 nodes := make([]*p2p.NodeInfo, nodeCount) for i := 0; i < nodeCount; i++ { - node, err := client.CreateNode(nil) + config := adapters.RandomNodeConfig() + node, err := client.CreateNode(config) if err != nil { t.Fatalf("error creating node: %s", err) } diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index c38e288552..389b1e3ec3 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" ) //a map of mocker names to its function @@ -102,7 +103,13 @@ func startStop(net *Network, quit chan struct{}, nodeCount int) { func probabilistic(net *Network, quit chan struct{}, nodeCount int) { nodes, err := connectNodesInRing(net, nodeCount) if err != nil { - panic("Could not startup node network for mocker") + select { + case <-quit: + //error may be due to abortion of mocking; so the quit channel is closed + return + default: + panic("Could not startup node network for mocker") + } } for { select { @@ -143,7 +150,7 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) { log.Debug(fmt.Sprintf("node %v shutting down", nodes[i])) err := net.Stop(nodes[i]) if err != nil { - log.Error(fmt.Sprintf("Error stopping node %s", nodes[i])) + log.Error("Error stopping node", "node", nodes[i]) wg.Done() continue } @@ -151,7 +158,7 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) { time.Sleep(randWait) err := net.Start(id) if err != nil { - log.Error(fmt.Sprintf("Error starting node %s", id)) + log.Error("Error starting node", "node", id) } wg.Done() }(nodes[i]) @@ -165,9 +172,10 @@ func probabilistic(net *Network, quit chan struct{}, nodeCount int) { func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) { ids := make([]discover.NodeID, nodeCount) for i := 0; i < nodeCount; i++ { - node, err := net.NewNode() + conf := adapters.RandomNodeConfig() + node, err := net.NewNodeWithConfig(conf) if err != nil { - log.Error("Error creating a node! %s", err) + log.Error("Error creating a node!", "err", err) return nil, err } ids[i] = node.ID() @@ -175,7 +183,7 @@ func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) for _, id := range ids { if err := net.Start(id); err != nil { - log.Error("Error starting a node! %s", err) + log.Error("Error starting a node!", "err", err) return nil, err } log.Debug(fmt.Sprintf("node %v starting up", id)) @@ -183,7 +191,7 @@ func connectNodesInRing(net *Network, nodeCount int) ([]discover.NodeID, error) for i, id := range ids { peerID := ids[(i+1)%len(ids)] if err := net.Connect(id, peerID); err != nil { - log.Error("Error connecting a node to a peer! %s", err) + log.Error("Error connecting a node to a peer!", "err", err) return nil, err } } diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index 1a2c1e8ff4..023129b0a3 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -382,6 +382,27 @@ func (net *Network) GetNodeByName(name string) *Node { return net.getNodeByName(name) } +// GetNodes returns the existing nodes +func (net *Network) GetNodes() (nodes []*Node) { + net.lock.Lock() + defer net.lock.Unlock() + + nodes = append(nodes, net.Nodes...) + return nodes +} + +// GetUpNodes returns the existing nodes that are up +func (net *Network) GetUpNodes() (nodes []*Node) { + net.lock.Lock() + defer net.lock.Unlock() + for _, n := range net.Nodes { + if n.Up { + nodes = append(nodes, n) + } + } + return nodes +} + func (net *Network) getNode(id discover.NodeID) *Node { i, found := net.nodeMap[id] if !found { @@ -399,15 +420,6 @@ func (net *Network) getNodeByName(name string) *Node { return nil } -// GetNodes returns the existing nodes -func (net *Network) GetNodes() (nodes []*Node) { - net.lock.Lock() - defer net.lock.Unlock() - - nodes = append(nodes, net.Nodes...) - return nodes -} - // GetConn returns the connection which exists between "one" and "other" // regardless of which node initiated the connection func (net *Network) GetConn(oneID, otherID discover.NodeID) *Conn { diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go index 2a062121be..f178bac502 100644 --- a/p2p/simulations/network_test.go +++ b/p2p/simulations/network_test.go @@ -41,7 +41,8 @@ func TestNetworkSimulation(t *testing.T) { nodeCount := 20 ids := make([]discover.NodeID, nodeCount) for i := 0; i < nodeCount; i++ { - node, err := network.NewNode() + conf := adapters.RandomNodeConfig() + node, err := network.NewNodeWithConfig(conf) if err != nil { t.Fatalf("error creating node: %s", err) } diff --git a/p2p/simulations/pipes/pipes.go b/p2p/simulations/pipes/pipes.go new file mode 100644 index 0000000000..000cae7405 --- /dev/null +++ b/p2p/simulations/pipes/pipes.go @@ -0,0 +1,86 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pipes + +import ( + "crypto/rand" + "net" + "os" + "syscall" +) + +// NetPipe wraps net.Pipe in a signature returning an error +func NetPipe() (net.Conn, net.Conn, error) { + p1, p2 := net.Pipe() + return p1, p2, nil +} + +// TCPPipe creates an in process full duplex pipe based on a localhost TCP socket +func TCPPipe() (net.Conn, net.Conn, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, nil, err + } + defer l.Close() + + var aconn net.Conn + aerr := make(chan error, 1) + go func() { + var err error + aconn, err = l.Accept() + aerr <- err + }() + + dconn, err := net.Dial("tcp", l.Addr().String()) + if err != nil { + <-aerr + return nil, nil, err + } + if err := <-aerr; err != nil { + dconn.Close() + return nil, nil, err + } + return aconn, dconn, nil +} + +// SocketPipe creates an in process full duplex pipe based on OS sockets +// credit to @lmars & Flynn +// https://github.com/flynn/flynn/blob/master/host/containerinit/init.go#L743-L749 +// using this in large simulations requires raising OS's max open file limit +func SocketPipe() (net.Conn, net.Conn, error) { + pair, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM, 0) + if err != nil { + return nil, nil, err + } + nameb := make([]byte, 8) + _, err = rand.Read(nameb) + if err != nil { + return nil, nil, err + } + f1 := os.NewFile(uintptr(pair[0]), string(nameb)+".out") + f2 := os.NewFile(uintptr(pair[1]), string(nameb)+".in") + pipe1, err := net.FileConn(f1) + if err != nil { + return nil, nil, err + } + pipe2, err := net.FileConn(f2) + if err != nil { + return nil, nil, err + } + + return pipe1, pipe2, nil +} diff --git a/params/version.go b/params/version.go index 1e8c43bf81..8689ccba7f 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 8 // Minor version component of the current release - VersionPatch = 9 // Patch version component of the current release + VersionPatch = 11 // Patch version component of the current release VersionMeta = "unstable" // Version metadata to append to the version string ) diff --git a/rpc/client.go b/rpc/client.go index 77b4d5ee01..1c88cfab84 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -61,7 +61,7 @@ const ( // The approach taken here is to maintain a per-subscription linked list buffer // shrinks on demand. If the buffer reaches the size below, the subscription is // dropped. - maxClientSubscriptionBuffer = 8000 + maxClientSubscriptionBuffer = 20000 ) // BatchElem is an element in a batch request. diff --git a/trie/encoding.go b/trie/encoding.go index e96a786e40..221fa6d3aa 100644 --- a/trie/encoding.go +++ b/trie/encoding.go @@ -83,7 +83,7 @@ func hexToKeybytes(hex []byte) []byte { if len(hex)&1 != 0 { panic("can't convert hex key of odd length") } - key := make([]byte, (len(hex)+1)/2) + key := make([]byte, len(hex)/2) decodeNibbles(hex, key) return key } diff --git a/trie/hasher.go b/trie/hasher.go index 2fc44787ac..ff61e70922 100644 --- a/trie/hasher.go +++ b/trie/hasher.go @@ -196,12 +196,12 @@ func (h *hasher) store(n node, db *Database, force bool) (node, error) { if h.onleaf != nil { switch n := n.(type) { case *shortNode: - if child, ok := n.Val.(valueNode); ok { + if child, ok := n.Val.(valueNode); ok && child != nil { h.onleaf(child, hash) } case *fullNode: for i := 0; i < 16; i++ { - if child, ok := n.Children[i].(valueNode); ok { + if child, ok := n.Children[i].(valueNode); ok && child != nil { h.onleaf(child, hash) } } diff --git a/trie/sync.go b/trie/sync.go index 4ae975d042..ccec80c9e3 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -68,19 +68,19 @@ func newSyncMemBatch() *syncMemBatch { } } -// TrieSync is the main state trie synchronisation scheduler, which provides yet +// Sync is the main state trie synchronisation scheduler, which provides yet // unknown trie hashes to retrieve, accepts node data associated with said hashes // and reconstructs the trie step by step until all is done. -type TrieSync struct { +type Sync struct { database DatabaseReader // Persistent database to check for existing entries membatch *syncMemBatch // Memory buffer to avoid frequest database writes requests map[common.Hash]*request // Pending requests pertaining to a key hash queue *prque.Prque // Priority queue with the pending requests } -// NewTrieSync creates a new trie data download scheduler. -func NewTrieSync(root common.Hash, database DatabaseReader, callback LeafCallback) *TrieSync { - ts := &TrieSync{ +// NewSync creates a new trie data download scheduler. +func NewSync(root common.Hash, database DatabaseReader, callback LeafCallback) *Sync { + ts := &Sync{ database: database, membatch: newSyncMemBatch(), requests: make(map[common.Hash]*request), @@ -91,7 +91,7 @@ func NewTrieSync(root common.Hash, database DatabaseReader, callback LeafCallbac } // AddSubTrie registers a new trie to the sync code, rooted at the designated parent. -func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) { +func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) { // Short circuit if the trie is empty or already known if root == emptyRoot { return @@ -126,7 +126,7 @@ func (s *TrieSync) AddSubTrie(root common.Hash, depth int, parent common.Hash, c // interpreted as a trie node, but rather accepted and stored into the database // as is. This method's goal is to support misc state metadata retrievals (e.g. // contract code). -func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) { +func (s *Sync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) { // Short circuit if the entry is empty or already known if hash == emptyState { return @@ -156,7 +156,7 @@ func (s *TrieSync) AddRawEntry(hash common.Hash, depth int, parent common.Hash) } // Missing retrieves the known missing nodes from the trie for retrieval. -func (s *TrieSync) Missing(max int) []common.Hash { +func (s *Sync) Missing(max int) []common.Hash { requests := []common.Hash{} for !s.queue.Empty() && (max == 0 || len(requests) < max) { requests = append(requests, s.queue.PopItem().(common.Hash)) @@ -167,7 +167,7 @@ func (s *TrieSync) Missing(max int) []common.Hash { // Process injects a batch of retrieved trie nodes data, returning if something // was committed to the database and also the index of an entry if processing of // it failed. -func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { +func (s *Sync) Process(results []SyncResult) (bool, int, error) { committed := false for i, item := range results { @@ -213,7 +213,7 @@ func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { // Commit flushes the data stored in the internal membatch out to persistent // storage, returning the number of items written and any occurred error. -func (s *TrieSync) Commit(dbw ethdb.Putter) (int, error) { +func (s *Sync) Commit(dbw ethdb.Putter) (int, error) { // Dump the membatch into a database dbw for i, key := range s.membatch.order { if err := dbw.Put(key[:], s.membatch.batch[key]); err != nil { @@ -228,14 +228,14 @@ func (s *TrieSync) Commit(dbw ethdb.Putter) (int, error) { } // Pending returns the number of state entries currently pending for download. -func (s *TrieSync) Pending() int { +func (s *Sync) Pending() int { return len(s.requests) } // schedule inserts a new state retrieval request into the fetch queue. If there // is already a pending request for this node, the new request will be discarded // and only a parent reference added to the old one. -func (s *TrieSync) schedule(req *request) { +func (s *Sync) schedule(req *request) { // If we're already requesting this node, add a new reference and stop if old, ok := s.requests[req.hash]; ok { old.parents = append(old.parents, req.parents...) @@ -248,7 +248,7 @@ func (s *TrieSync) schedule(req *request) { // children retrieves all the missing children of a state trie entry for future // retrieval scheduling. -func (s *TrieSync) children(req *request, object node) ([]*request, error) { +func (s *Sync) children(req *request, object node) ([]*request, error) { // Gather all the children of the node, irrelevant whether known or not type child struct { node node @@ -310,7 +310,7 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { // commit finalizes a retrieval request and stores it into the membatch. If any // of the referencing parent requests complete due to this commit, they are also // committed themselves. -func (s *TrieSync) commit(req *request) (err error) { +func (s *Sync) commit(req *request) (err error) { // Write the node content to the membatch s.membatch.batch[req.hash] = req.data s.membatch.order = append(s.membatch.order, req.hash) diff --git a/trie/sync_test.go b/trie/sync_test.go index 142a6f5b1a..c76779e5c7 100644 --- a/trie/sync_test.go +++ b/trie/sync_test.go @@ -87,14 +87,14 @@ func checkTrieConsistency(db *Database, root common.Hash) error { } // Tests that an empty trie is not scheduled for syncing. -func TestEmptyTrieSync(t *testing.T) { +func TestEmptySync(t *testing.T) { dbA := NewDatabase(ethdb.NewMemDatabase()) dbB := NewDatabase(ethdb.NewMemDatabase()) emptyA, _ := New(common.Hash{}, dbA) emptyB, _ := New(emptyRoot, dbB) for i, trie := range []*Trie{emptyA, emptyB} { - if req := NewTrieSync(trie.Hash(), ethdb.NewMemDatabase(), nil).Missing(1); len(req) != 0 { + if req := NewSync(trie.Hash(), ethdb.NewMemDatabase(), nil).Missing(1); len(req) != 0 { t.Errorf("test %d: content requested for empty trie: %v", i, req) } } @@ -102,17 +102,17 @@ func TestEmptyTrieSync(t *testing.T) { // Tests that given a root hash, a trie can sync iteratively on a single thread, // requesting retrieval tasks and returning all of them in one go. -func TestIterativeTrieSyncIndividual(t *testing.T) { testIterativeTrieSync(t, 1) } -func TestIterativeTrieSyncBatched(t *testing.T) { testIterativeTrieSync(t, 100) } +func TestIterativeSyncIndividual(t *testing.T) { testIterativeSync(t, 1) } +func TestIterativeSyncBatched(t *testing.T) { testIterativeSync(t, 100) } -func testIterativeTrieSync(t *testing.T, batch int) { +func testIterativeSync(t *testing.T, batch int) { // Create a random trie to copy srcDb, srcTrie, srcData := makeTestTrie() // Create a destination trie and sync with the scheduler diskdb := ethdb.NewMemDatabase() triedb := NewDatabase(diskdb) - sched := NewTrieSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil) queue := append([]common.Hash{}, sched.Missing(batch)...) for len(queue) > 0 { @@ -138,14 +138,14 @@ func testIterativeTrieSync(t *testing.T, batch int) { // Tests that the trie scheduler can correctly reconstruct the state even if only // partial results are returned, and the others sent only later. -func TestIterativeDelayedTrieSync(t *testing.T) { +func TestIterativeDelayedSync(t *testing.T) { // Create a random trie to copy srcDb, srcTrie, srcData := makeTestTrie() // Create a destination trie and sync with the scheduler diskdb := ethdb.NewMemDatabase() triedb := NewDatabase(diskdb) - sched := NewTrieSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil) queue := append([]common.Hash{}, sched.Missing(10000)...) for len(queue) > 0 { @@ -173,17 +173,17 @@ func TestIterativeDelayedTrieSync(t *testing.T) { // Tests that given a root hash, a trie can sync iteratively on a single thread, // requesting retrieval tasks and returning all of them in one go, however in a // random order. -func TestIterativeRandomTrieSyncIndividual(t *testing.T) { testIterativeRandomTrieSync(t, 1) } -func TestIterativeRandomTrieSyncBatched(t *testing.T) { testIterativeRandomTrieSync(t, 100) } +func TestIterativeRandomSyncIndividual(t *testing.T) { testIterativeRandomSync(t, 1) } +func TestIterativeRandomSyncBatched(t *testing.T) { testIterativeRandomSync(t, 100) } -func testIterativeRandomTrieSync(t *testing.T, batch int) { +func testIterativeRandomSync(t *testing.T, batch int) { // Create a random trie to copy srcDb, srcTrie, srcData := makeTestTrie() // Create a destination trie and sync with the scheduler diskdb := ethdb.NewMemDatabase() triedb := NewDatabase(diskdb) - sched := NewTrieSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil) queue := make(map[common.Hash]struct{}) for _, hash := range sched.Missing(batch) { @@ -217,14 +217,14 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) { // Tests that the trie scheduler can correctly reconstruct the state even if only // partial results are returned (Even those randomly), others sent only later. -func TestIterativeRandomDelayedTrieSync(t *testing.T) { +func TestIterativeRandomDelayedSync(t *testing.T) { // Create a random trie to copy srcDb, srcTrie, srcData := makeTestTrie() // Create a destination trie and sync with the scheduler diskdb := ethdb.NewMemDatabase() triedb := NewDatabase(diskdb) - sched := NewTrieSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil) queue := make(map[common.Hash]struct{}) for _, hash := range sched.Missing(10000) { @@ -264,14 +264,14 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) { // Tests that a trie sync will not request nodes multiple times, even if they // have such references. -func TestDuplicateAvoidanceTrieSync(t *testing.T) { +func TestDuplicateAvoidanceSync(t *testing.T) { // Create a random trie to copy srcDb, srcTrie, srcData := makeTestTrie() // Create a destination trie and sync with the scheduler diskdb := ethdb.NewMemDatabase() triedb := NewDatabase(diskdb) - sched := NewTrieSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil) queue := append([]common.Hash{}, sched.Missing(0)...) requested := make(map[common.Hash]struct{}) @@ -304,14 +304,14 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) { // Tests that at any point in time during a sync, only complete sub-tries are in // the database. -func TestIncompleteTrieSync(t *testing.T) { +func TestIncompleteSync(t *testing.T) { // Create a random trie to copy srcDb, srcTrie, _ := makeTestTrie() // Create a destination trie and sync with the scheduler diskdb := ethdb.NewMemDatabase() triedb := NewDatabase(diskdb) - sched := NewTrieSync(srcTrie.Hash(), diskdb, nil) + sched := NewSync(srcTrie.Hash(), diskdb, nil) added := []common.Hash{} queue := append([]common.Hash{}, sched.Missing(1)...) diff --git a/trie/trie.go b/trie/trie.go index 31a404e3a0..30543c5496 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -101,7 +101,7 @@ func New(root common.Hash, db *Database) (*Trie, error) { db: db, originalRoot: root, } - if (root != common.Hash{}) && root != emptyRoot { + if root != (common.Hash{}) && root != emptyRoot { rootnode, err := trie.resolveHash(root[:], nil) if err != nil { return nil, err diff --git a/whisper/shhclient/client.go b/whisper/shhclient/client.go index 8e7085a0a6..a814154e47 100644 --- a/whisper/shhclient/client.go +++ b/whisper/shhclient/client.go @@ -159,9 +159,9 @@ func (sc *Client) DeleteSymmetricKey(ctx context.Context, id string) error { } // Post a message onto the network. -func (sc *Client) Post(ctx context.Context, message whisper.NewMessage) error { - var ignored bool - return sc.c.CallContext(ctx, &ignored, "shh_post", message) +func (sc *Client) Post(ctx context.Context, message whisper.NewMessage) (string, error) { + var hash string + return hash, sc.c.CallContext(ctx, &hash, "shh_post", message) } // SubscribeMessages subscribes to messages that match the given criteria. This method