Skip to content

Commit

Permalink
client/eth: Check provider header times.
Browse files Browse the repository at this point in the history
When fetching a new or cached header with a provider, do a basic check
on the header's time to determine if the header, and so the provider,
are up to date.
  • Loading branch information
JoeGruffins committed Jan 25, 2023
1 parent 69ace7b commit f6087f9
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 82 deletions.
12 changes: 4 additions & 8 deletions client/asset/eth/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ type ethFetcher interface {
sendSignedTransaction(ctx context.Context, tx *types.Transaction) error
sendTransaction(ctx context.Context, txOpts *bind.TransactOpts, to common.Address, data []byte) (*types.Transaction, error)
signData(data []byte) (sig, pubKey []byte, err error)
syncProgress(context.Context) (*ethereum.SyncProgress, error)
syncProgress(context.Context) (progress *ethereum.SyncProgress, bestHeaderUNIXTime uint64, err error)
transactionConfirmations(context.Context, common.Hash) (uint32, error)
getTransaction(context.Context, common.Hash) (*types.Transaction, int64, error)
txOpts(ctx context.Context, val, maxGas uint64, maxFeeRate, nonce *big.Int) (*bind.TransactOpts, error)
Expand Down Expand Up @@ -2798,18 +2798,14 @@ func (*baseWallet) ValidateSecret(secret, secretHash []byte) bool {
// more, requesting the best block header starts to fail after a few tries
// during initial sync. Investigate how to get correct sync progress.
func (eth *baseWallet) SyncStatus() (bool, float32, error) {
prog, err := eth.node.syncProgress(eth.ctx)
prog, bestHeaderUNIXTime, err := eth.node.syncProgress(eth.ctx)
if err != nil {
return false, 0, err
}
checkHeaderTime := func() (bool, error) {
bh, err := eth.node.bestHeader(eth.ctx)
if err != nil {
return false, err
}
// Time in the header is in seconds.
timeDiff := time.Now().Unix() - int64(bh.Time)
if timeDiff > dexeth.MaxBlockInterval && eth.net != dex.Simnet {
timeDiff := time.Now().Unix() - int64(bestHeaderUNIXTime)
if timeDiff > dexeth.MaxBlockInterval {
eth.log.Infof("Time since last eth block (%d sec) exceeds %d sec."+
"Assuming not in sync. Ensure your computer's system clock "+
"is correct.", timeDiff, dexeth.MaxBlockInterval)
Expand Down
94 changes: 48 additions & 46 deletions client/asset/eth/eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,43 +91,45 @@ type tGetTxRes struct {
}

type testNode struct {
acct *accounts.Account
addr common.Address
connectErr error
bestHdr *types.Header
bestHdrErr error
syncProg ethereum.SyncProgress
bal *big.Int
balErr error
signDataErr error
privKey *ecdsa.PrivateKey
swapVers map[uint32]struct{} // For SwapConfirmations -> swap. TODO for other contractor methods
swapMap map[[32]byte]*dexeth.SwapState
refundable bool
baseFee *big.Int
tip *big.Int
netFeeStateErr error
confNonce uint64
confNonceErr error
getTxRes *types.Transaction
getTxResMap map[common.Hash]*tGetTxRes
getTxHeight int64
getTxErr error
receipt *types.Receipt
receiptTx *types.Transaction
receiptErr error
hdrByHash *types.Header
txReceipt *types.Receipt
lastSignedTx *types.Transaction
sendTxTx *types.Transaction
sendTxErr error
simBackend bind.ContractBackend
maxFeeRate *big.Int
pendingTxs []*types.Transaction
tContractor *tContractor
tokenContractor *tTokenContractor
contractor contractor
tokenParent *assetWallet // only set for tokens
acct *accounts.Account
addr common.Address
connectErr error
bestHdr *types.Header
bestHdrErr error
syncProg ethereum.SyncProgress
syncProgBestHeaderUNIXTime uint64
syncProgErr error
bal *big.Int
balErr error
signDataErr error
privKey *ecdsa.PrivateKey
swapVers map[uint32]struct{} // For SwapConfirmations -> swap. TODO for other contractor methods
swapMap map[[32]byte]*dexeth.SwapState
refundable bool
baseFee *big.Int
tip *big.Int
netFeeStateErr error
confNonce uint64
confNonceErr error
getTxRes *types.Transaction
getTxResMap map[common.Hash]*tGetTxRes
getTxHeight int64
getTxErr error
receipt *types.Receipt
receiptTx *types.Transaction
receiptErr error
hdrByHash *types.Header
txReceipt *types.Receipt
lastSignedTx *types.Transaction
sendTxTx *types.Transaction
sendTxErr error
simBackend bind.ContractBackend
maxFeeRate *big.Int
pendingTxs []*types.Transaction
tContractor *tContractor
tokenContractor *tTokenContractor
contractor contractor
tokenParent *assetWallet // only set for tokens
}

func newBalance(current, in, out uint64) *Balance {
Expand Down Expand Up @@ -206,8 +208,8 @@ func (n *testNode) lock() error {
func (n *testNode) locked() bool {
return false
}
func (n *testNode) syncProgress(context.Context) (*ethereum.SyncProgress, error) {
return &n.syncProg, nil
func (n *testNode) syncProgress(context.Context) (prog *ethereum.SyncProgress, bestBlockUNIXTime uint64, err error) {
return &n.syncProg, n.syncProgBestHeaderUNIXTime, n.syncProgErr
}
func (n *testNode) peerCount() uint32 {
return 1
Expand Down Expand Up @@ -479,8 +481,8 @@ func TestSyncStatus(t *testing.T) {
tests := []struct {
name string
syncProg ethereum.SyncProgress
syncProgErr error
subSecs uint64
bestHdrErr error
wantErr, wantSynced bool
wantRatio float32
}{{
Expand All @@ -506,22 +508,22 @@ func TestSyncStatus(t *testing.T) {
},
subSecs: dexeth.MaxBlockInterval + 1,
}, {
name: "best header error",
bestHdrErr: errors.New(""),
name: "sync progress error",
syncProg: ethereum.SyncProgress{
CurrentBlock: 25,
HighestBlock: 0,
},
wantErr: true,
syncProgErr: errors.New(""),
wantErr: true,
}}

for _, test := range tests {
nowInSecs := uint64(time.Now().Unix())
ctx, cancel := context.WithCancel(context.Background())
node := &testNode{
syncProg: test.syncProg,
bestHdr: &types.Header{Time: nowInSecs - test.subSecs},
bestHdrErr: test.bestHdrErr,
syncProg: test.syncProg,
syncProgBestHeaderUNIXTime: nowInSecs - test.subSecs,
syncProgErr: test.syncProgErr,
}
eth := &baseWallet{
node: node,
Expand Down
28 changes: 20 additions & 8 deletions client/asset/eth/multirpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (p *provider) cachedTip() *types.Header {

p.tip.RLock()
defer p.tip.RUnlock()
if time.Since(p.tip.failStamp) < failQuarantine || time.Since(p.tip.headerStamp) > stale {
if time.Since(p.tip.failStamp) < failQuarantine ||
time.Since(p.tip.headerStamp) > stale ||
time.Now().Unix()-int64(p.tip.header.Time) > dexeth.MaxBlockInterval {
return nil
}
return p.tip.header
Expand All @@ -128,7 +130,8 @@ func (p *provider) setFailed() {
p.tip.Unlock()
}

// failed will be true if setFailed has been called in the last failQuarantine.
// failed will be true if setFailed has been called in the last failQuarantine
// or the cached tip is old.
func (p *provider) failed() bool {
p.tip.Lock()
defer p.tip.Unlock()
Expand All @@ -142,16 +145,23 @@ func (p *provider) bestHeader(ctx context.Context, log dex.Logger) (*types.Heade
defer cancel()
// Check if we have a cached header.
if tip := p.cachedTip(); tip != nil {
log.Tracef("using cached header from %q", p.host)
log.Tracef("Using cached header from %q", p.host)
return tip, nil
}

log.Tracef("fetching fresh header from %q", p.host)
log.Tracef("Fetching fresh header from %q", p.host)
hdr, err := p.ec.HeaderByNumber(ctx, nil /* latest */)
if err != nil {
p.setFailed()
return nil, fmt.Errorf("HeaderByNumber error: %w", err)
}
timeDiff := time.Now().Unix() - int64(p.tip.header.Time)
if timeDiff > dexeth.MaxBlockInterval {
p.setFailed()
return nil, fmt.Errorf("time since last eth block (%d sec) exceeds %d sec."+
"Assuming provider %s is not in sync. Ensure your computer's system clock "+
"is correct.", timeDiff, dexeth.MaxBlockInterval, p.host)
}
p.setTip(hdr)
return hdr, nil
}
Expand Down Expand Up @@ -1031,14 +1041,16 @@ func (m *multiRPCClient) signData(data []byte) (sig, pubKey []byte, err error) {
return signData(m.creds, data)
}

// syncProgress: We're going to lie and just always say we're synced if we
// can get a header.
func (m *multiRPCClient) syncProgress(ctx context.Context) (prog *ethereum.SyncProgress, err error) {
return prog, m.withAny(func(p *provider) error {
// syncProgress: Current and Highest blocks are not very useful for the caller,
// but the best header's time in seconds can be used to determine if the
// provider is out of sync.
func (m *multiRPCClient) syncProgress(ctx context.Context) (prog *ethereum.SyncProgress, bestHeaderUNIXTime uint64, err error) {
return prog, bestHeaderUNIXTime, m.withAny(func(p *provider) error {
tip, err := p.bestHeader(ctx, m.log)
if err != nil {
return err
}
bestHeaderUNIXTime = tip.Time

prog = &ethereum.SyncProgress{
CurrentBlock: tip.Number.Uint64(),
Expand Down
9 changes: 9 additions & 0 deletions client/asset/eth/multirpc_live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ func testEndpoint(endpoints []string, syncBlocks uint64, tFunc func(context.Cont
return nil
}

func TestMain(m *testing.M) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// Tests will fail if the best block header is too old.
time.Sleep(time.Second)
mine(ctx)
time.Sleep(time.Second)
}

func TestHTTP(t *testing.T) {
if err := testEndpoint([]string{"http://localhost:" + deltaHTTPPort}, 2, nil); err != nil {
t.Fatal(err)
Expand Down
11 changes: 8 additions & 3 deletions client/asset/eth/nodeclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,15 @@ func (n *nodeClient) sendTransaction(ctx context.Context, txOpts *bind.TransactO
return tx, n.leth.ApiBackend.SendTx(ctx, tx)
}

// syncProgress return the current sync progress. Returns no error and nil when not syncing.
func (n *nodeClient) syncProgress(_ context.Context) (*ethereum.SyncProgress, error) {
// syncProgress return the current sync progress and the best block's header
// time in seconds. Returns no error and nil when not syncing.
func (n *nodeClient) syncProgress(ctx context.Context) (*ethereum.SyncProgress, uint64, error) {
hdr, err := n.bestHeader(ctx)
if err != nil {
return nil, 0, err
}
p := n.leth.ApiBackend.SyncProgress()
return &p, nil
return &p, hdr.Time, nil
}

// signData uses the private key of the address to sign a piece of data.
Expand Down
38 changes: 21 additions & 17 deletions client/asset/eth/nodeclient_harness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,18 @@ func runSimnet(m *testing.M) (int, error) {

ethSwapContractAddr = dexeth.ContractAddresses[0][dex.Simnet]

// Tests will fail if the best block header is too old.
cmd := exec.CommandContext(ctx, "./mine-alpha", "2")
homeDir, err := os.UserHomeDir()
if err != nil {
return 1, err
}
harnessCtlDir := filepath.Join(homeDir, "dextest", "eth", "harness-ctl")
cmd.Dir = harnessCtlDir
if _, err = cmd.CombinedOutput(); err != nil {
return 1, fmt.Errorf("unexpected error while waiting to mine some blocks: %v", err)
}

initiatorRPC, participantRPC := rpcEndpoints(dex.Simnet)

err = setupWallet(simnetWalletDir, simnetWalletSeed, "localhost:30355", initiatorRPC, dex.Simnet)
Expand Down Expand Up @@ -439,8 +451,6 @@ func runSimnet(m *testing.M) (int, error) {
}

// Fund the wallets.
homeDir, _ := os.UserHomeDir()
harnessCtlDir := filepath.Join(homeDir, "dextest", "eth", "harness-ctl")
send := func(exe, addr, amt string) error {
cmd := exec.CommandContext(ctx, exe, addr, amt)
cmd.Dir = harnessCtlDir
Expand All @@ -464,7 +474,7 @@ func runSimnet(m *testing.M) (int, error) {
}
}

cmd := exec.CommandContext(ctx, "./mine-alpha", "1")
cmd = exec.CommandContext(ctx, "./mine-alpha", "1")
cmd.Dir = harnessCtlDir
if err := cmd.Run(); err != nil {
return 1, fmt.Errorf("error mining block after funding wallets")
Expand Down Expand Up @@ -738,22 +748,13 @@ func syncClient(cl ethFetcher) error {
if err := ctx.Err(); err != nil {
return err
}
prog, err := cl.syncProgress(ctx)
prog, bestHeaderUNIXTime, err := cl.syncProgress(ctx)
if err != nil {
return err
}
if isTestnet {
if prog.HighestBlock == 0 {
bh, err := cl.bestHeader(ctx)
if err != nil {
return err
}
// Time in the header is in seconds.
timeDiff := time.Now().Unix() - int64(bh.Time)
if timeDiff < dexeth.MaxBlockInterval {
return nil
}
} else if prog.CurrentBlock >= prog.HighestBlock {
timeDiff := time.Now().Unix() - int64(bestHeaderUNIXTime)
if timeDiff < dexeth.MaxBlockInterval {
return nil
}
} else {
Expand Down Expand Up @@ -1112,7 +1113,7 @@ func testSwap(t *testing.T, assetID uint32) {
}

func testSyncProgress(t *testing.T) {
p, err := ethClient.syncProgress(ctx)
p, _, err := ethClient.syncProgress(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -2034,7 +2035,10 @@ func testRefund(t *testing.T, assetID uint32) {
t.Fatalf("%s: pre-redeem mining error: %v", test.name, err)
}

txOpts, _ = participantEthClient.txOpts(ctx, 0, gases.RedeemN(1), nil, nil)
txOpts, err = participantEthClient.txOpts(ctx, 0, gases.RedeemN(1), nil, nil)
if err != nil {
t.Fatalf("%s: txOpts error: %v", test.name, err)
}
_, err := pc.redeem(txOpts, []*asset.Redemption{newRedeem(secret, secretHash)})
if err != nil {
t.Fatalf("%s: redeem error: %v", test.name, err)
Expand Down

0 comments on commit f6087f9

Please sign in to comment.