Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions eth/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,12 @@ func (s *stateSync) loop() (err error) {
s.d.dropPeer(req.peer.id)
}
// Process all the received blobs and check for stale delivery
if err = s.process(req); err != nil {
delivered, err := s.process(req)
if err != nil {
log.Warn("Node data write error", "err", err)
return err
}
req.peer.SetNodeDataIdle(len(req.response))
req.peer.SetNodeDataIdle(delivered)
}
}
return nil
Expand Down Expand Up @@ -398,9 +399,11 @@ func (s *stateSync) fillTasks(n int, req *stateReq) {
// process iterates over a batch of delivered state data, injecting each item
// into a running state sync, re-queuing any items that were requested but not
// delivered.
func (s *stateSync) process(req *stateReq) error {
// Returns whether the peer actually managed to deliver anything of value,
// and any error that occurred
func (s *stateSync) process(req *stateReq) (int, error) {
// Collect processing stats and update progress if valid data was received
duplicate, unexpected := 0, 0
duplicate, unexpected, successful := 0, 0, 0

defer func(start time.Time) {
if duplicate > 0 || unexpected > 0 {
Expand All @@ -410,20 +413,20 @@ func (s *stateSync) process(req *stateReq) error {

// Iterate over all the delivered data and inject one-by-one into the trie
progress := false

for _, blob := range req.response {
prog, hash, err := s.processNodeData(blob)
switch err {
case nil:
s.numUncommitted++
s.bytesUncommitted += len(blob)
progress = progress || prog
successful++
case trie.ErrNotRequested:
unexpected++
case trie.ErrAlreadyProcessed:
duplicate++
default:
return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
if _, ok := req.tasks[hash]; ok {
delete(req.tasks, hash)
Expand All @@ -441,12 +444,12 @@ func (s *stateSync) process(req *stateReq) error {
// If we've requested the node too many times already, it may be a malicious
// sync where nobody has the right data. Abort.
if len(task.attempts) >= npeers {
return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
}
// Missing item, place into the retry queue.
s.tasks[hash] = task
}
return nil
return successful, nil
}

// processNodeData tries to inject a trie node data blob delivered from a remote
Expand Down