diff --git a/network/stream/v2/common_test.go b/network/stream/v2/common_test.go index 5f844846db..bb025b090f 100644 --- a/network/stream/v2/common_test.go +++ b/network/stream/v2/common_test.go @@ -57,8 +57,6 @@ var ( bucketKeyFileStore = "filestore" bucketKeyLocalStore = "localstore" bucketKeyInitialBinIndexes = "bin-indexes" - - simContextTimeout = 90 * time.Second ) func nodeRegistry(sim *simulation.Simulation, id enode.ID) (s *Registry) { diff --git a/network/stream/v2/cursors_test.go b/network/stream/v2/cursors_test.go index f0b0cd43ba..5e9bf40a73 100644 --- a/network/stream/v2/cursors_test.go +++ b/network/stream/v2/cursors_test.go @@ -61,61 +61,34 @@ func TestNodesExchangeCorrectBinIndexes(t *testing.T) { }) defer sim.Close() - ctx, cancel := context.WithTimeout(context.Background(), simContextTimeout) - defer cancel() _, err := sim.AddNodesAndConnectStar(nodeCount) if err != nil { t.Fatal(err) } - - getCursorsCopy := func(sim *simulation.Simulation, idOne, idOther enode.ID) map[string]uint64 { - r := nodeRegistry(sim, idOne) - if r == nil { - return nil - } - p := r.getPeer(idOther) - if p == nil { - return nil - } - return p.getCursorsCopy() + nodeIDs := sim.UpNodeIDs() + if len(nodeIDs) != nodeCount { + t.Error("not enough nodes up") } - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { - nodeIDs := sim.UpNodeIDs() - if len(nodeIDs) != nodeCount { - return errors.New("not enough nodes up") - } - - // periodically check for cursors - for i := 0; i < 100; i++ { - // wait for the nodes to exchange StreamInfo messages - time.Sleep(10 * time.Millisecond) + idOne := nodeIDs[0] + idOther := nodeIDs[1] - idOne := nodeIDs[0] - idOther := nodeIDs[1] - onesCursors := getCursorsCopy(sim, idOne, idOther) - othersCursors := getCursorsCopy(sim, idOther, idOne) + waitForCursors(t, sim, idOne, idOther, true) + waitForCursors(t, sim, idOther, idOne, true) - onesBins := nodeInitialBinIndexes(sim, idOne) - othersBins := nodeInitialBinIndexes(sim, idOther) + onesCursors := getCursorsCopy(sim, idOne, idOther) + othersCursors := getCursorsCopy(sim, idOther, idOne) - err1 := compareNodeBinsToStreams(t, onesCursors, othersBins) - if err1 != nil { - err = err1 // set the resulting error when the loop is done - } - err2 := compareNodeBinsToStreams(t, othersCursors, onesBins) - if err2 != nil { - err = err2 // set the resulting error when the loop is done - } - if err1 == nil && err2 == nil { - return nil - } - } + onesBins := nodeInitialBinIndexes(sim, idOne) + othersBins := nodeInitialBinIndexes(sim, idOther) - return err - }) - if result.Error != nil { - t.Fatal(result.Error) + err = compareNodeBinsToStreams(t, onesCursors, othersBins) + if err != nil { + t.Error(err) + } + err = compareNodeBinsToStreams(t, othersCursors, onesBins) + if err != nil { + t.Error(err) } } @@ -136,74 +109,56 @@ func TestNodesCorrectBinsDynamic(t *testing.T) { }) defer sim.Close() - ctx, cancel := context.WithTimeout(context.Background(), simContextTimeout) - defer cancel() _, err := sim.AddNodesAndConnectStar(2) if err != nil { t.Fatal(err) } - result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error { + nodeIDs := sim.UpNodeIDs() + if len(nodeIDs) != 2 { + t.Fatal("not enough nodes up") + } + + waitForCursors(t, sim, nodeIDs[0], nodeIDs[1], true) + waitForCursors(t, sim, nodeIDs[1], nodeIDs[0], true) + + for j := 2; j <= nodeCount; j++ { + // append a node to the simulation + id, err := sim.AddNodes(1) + if err != nil { + t.Fatal(err) + } + err = sim.Net.ConnectNodesStar(id, nodeIDs[0]) + if err != nil { + t.Fatal(err) + } nodeIDs := sim.UpNodeIDs() - if len(nodeIDs) != 2 { - return errors.New("not enough nodes up") + if len(nodeIDs) != j+1 { + t.Fatalf("not enough nodes up. got %d, want %d", len(nodeIDs), j+1) } + idPivot := nodeIDs[0] - // wait for the nodes to exchange StreamInfo messages - wantCursorsCount := 17 - for i := 499; i >= 0; i-- { // wait time 5s - time.Sleep(10 * time.Millisecond) - count1 := nodeRegistry(sim, nodeIDs[0]).getPeer(nodeIDs[1]).cursorsCount() - count2 := nodeRegistry(sim, nodeIDs[1]).getPeer(nodeIDs[0]).cursorsCount() - if count1 >= wantCursorsCount && count2 >= wantCursorsCount { - break - } - if i == 0 { - return fmt.Errorf("got cursors %v and %v, want %v", count1, count2, wantCursorsCount) - } - } + waitForCursors(t, sim, idPivot, nodeIDs[j], true) + waitForCursors(t, sim, nodeIDs[j], idPivot, true) - idPivot := nodeIDs[0] pivotSyncer := nodeRegistry(sim, idPivot) pivotKademlia := nodeKademlia(sim, idPivot) pivotDepth := uint(pivotKademlia.NeighbourhoodDepth()) - for j := 2; j <= nodeCount; j++ { - // append a node to the simulation - id, err := sim.AddNodes(1) - if err != nil { - return err - } - err = sim.Net.ConnectNodesStar(id, nodeIDs[0]) - if err != nil { - return err - } - nodeIDs := sim.UpNodeIDs() - if len(nodeIDs) != j+1 { - return fmt.Errorf("not enough nodes up. got %d, want %d", len(nodeIDs), j) - } - time.Sleep(50 * time.Millisecond) - idPivot = nodeIDs[0] - for i := 1; i < j; i++ { - idOther := nodeIDs[i] - otherKademlia := sim.MustNodeItem(idOther, simulation.BucketKeyKademlia).(*network.Kademlia) - po := chunk.Proximity(otherKademlia.BaseAddr(), pivotKademlia.BaseAddr()) - depth := pivotKademlia.NeighbourhoodDepth() - pivotCursors := pivotSyncer.getPeer(idOther).getCursorsCopy() - - // check that the pivot node is interested just in bins >= depth - if po >= depth { - othersBins := nodeInitialBinIndexes(sim, idOther) - if err := compareNodeBinsToStreamsWithDepth(t, pivotCursors, othersBins, pivotDepth); err != nil { - return err - } + for i := 1; i < j; i++ { + idOther := nodeIDs[i] + otherKademlia := sim.MustNodeItem(idOther, simulation.BucketKeyKademlia).(*network.Kademlia) + po := chunk.Proximity(otherKademlia.BaseAddr(), pivotKademlia.BaseAddr()) + pivotCursors := pivotSyncer.getPeer(idOther).getCursorsCopy() + + // check that the pivot node is interested just in bins >= depth + if po >= int(pivotDepth) { + othersBins := nodeInitialBinIndexes(sim, idOther) + if err := compareNodeBinsToStreamsWithDepth(t, pivotCursors, othersBins, pivotDepth); err != nil { + t.Error(err) } } } - return nil - }) - if result.Error != nil { - t.Fatal(result.Error) } } @@ -424,6 +379,19 @@ func waitForCursors(t *testing.T, sim *simulation.Simulation, pivotEnode, lookup } } +// getCursorsCopy returns cursors on node idOne for its peer idOther. +func getCursorsCopy(sim *simulation.Simulation, idOne, idOther enode.ID) map[string]uint64 { + r := nodeRegistry(sim, idOne) + if r == nil { + return nil + } + p := r.getPeer(idOther) + if p == nil { + return nil + } + return p.getCursorsCopy() +} + // compareNodeBinsToStreams checks that the values on `onesCursors` correlate to the values in `othersBins` // onesCursors represents the stream cursors that node A knows about node B (i.e. they shoud reflect directly in this case // the values which node B retrieved from its local store)