Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions network/stream/v2/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ var (
bucketKeyFileStore = "filestore"
bucketKeyLocalStore = "localstore"
bucketKeyInitialBinIndexes = "bin-indexes"

simContextTimeout = 90 * time.Second
)

func nodeRegistry(sim *simulation.Simulation, id enode.ID) (s *Registry) {
Expand Down
162 changes: 65 additions & 97 deletions network/stream/v2/cursors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,61 +61,34 @@ func TestNodesExchangeCorrectBinIndexes(t *testing.T) {
})
defer sim.Close()

ctx, cancel := context.WithTimeout(context.Background(), simContextTimeout)
defer cancel()
_, err := sim.AddNodesAndConnectStar(nodeCount)
if err != nil {
t.Fatal(err)
}

getCursorsCopy := func(sim *simulation.Simulation, idOne, idOther enode.ID) map[string]uint64 {
r := nodeRegistry(sim, idOne)
if r == nil {
return nil
}
p := r.getPeer(idOther)
if p == nil {
return nil
}
return p.getCursorsCopy()
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != nodeCount {
t.Error("not enough nodes up")
}

result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != nodeCount {
return errors.New("not enough nodes up")
}

// periodically check for cursors
for i := 0; i < 100; i++ {
// wait for the nodes to exchange StreamInfo messages
time.Sleep(10 * time.Millisecond)
idOne := nodeIDs[0]
idOther := nodeIDs[1]

idOne := nodeIDs[0]
idOther := nodeIDs[1]
onesCursors := getCursorsCopy(sim, idOne, idOther)
othersCursors := getCursorsCopy(sim, idOther, idOne)
waitForCursors(t, sim, idOne, idOther, true)
waitForCursors(t, sim, idOther, idOne, true)

onesBins := nodeInitialBinIndexes(sim, idOne)
othersBins := nodeInitialBinIndexes(sim, idOther)
onesCursors := getCursorsCopy(sim, idOne, idOther)
othersCursors := getCursorsCopy(sim, idOther, idOne)

err1 := compareNodeBinsToStreams(t, onesCursors, othersBins)
if err1 != nil {
err = err1 // set the resulting error when the loop is done
}
err2 := compareNodeBinsToStreams(t, othersCursors, onesBins)
if err2 != nil {
err = err2 // set the resulting error when the loop is done
}
if err1 == nil && err2 == nil {
return nil
}
}
onesBins := nodeInitialBinIndexes(sim, idOne)
othersBins := nodeInitialBinIndexes(sim, idOther)

return err
})
if result.Error != nil {
t.Fatal(result.Error)
err = compareNodeBinsToStreams(t, onesCursors, othersBins)
if err != nil {
t.Error(err)
}
err = compareNodeBinsToStreams(t, othersCursors, onesBins)
if err != nil {
t.Error(err)
}
}

Expand All @@ -136,74 +109,56 @@ func TestNodesCorrectBinsDynamic(t *testing.T) {
})
defer sim.Close()

ctx, cancel := context.WithTimeout(context.Background(), simContextTimeout)
defer cancel()
_, err := sim.AddNodesAndConnectStar(2)
if err != nil {
t.Fatal(err)
}

result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != 2 {
t.Fatal("not enough nodes up")
}

waitForCursors(t, sim, nodeIDs[0], nodeIDs[1], true)
waitForCursors(t, sim, nodeIDs[1], nodeIDs[0], true)

for j := 2; j <= nodeCount; j++ {
// append a node to the simulation
id, err := sim.AddNodes(1)
if err != nil {
t.Fatal(err)
}
err = sim.Net.ConnectNodesStar(id, nodeIDs[0])
if err != nil {
t.Fatal(err)
}
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != 2 {
return errors.New("not enough nodes up")
if len(nodeIDs) != j+1 {
t.Fatalf("not enough nodes up. got %d, want %d", len(nodeIDs), j+1)
}
idPivot := nodeIDs[0]

// wait for the nodes to exchange StreamInfo messages
wantCursorsCount := 17
for i := 499; i >= 0; i-- { // wait time 5s
time.Sleep(10 * time.Millisecond)
count1 := nodeRegistry(sim, nodeIDs[0]).getPeer(nodeIDs[1]).cursorsCount()
count2 := nodeRegistry(sim, nodeIDs[1]).getPeer(nodeIDs[0]).cursorsCount()
if count1 >= wantCursorsCount && count2 >= wantCursorsCount {
break
}
if i == 0 {
return fmt.Errorf("got cursors %v and %v, want %v", count1, count2, wantCursorsCount)
}
}
waitForCursors(t, sim, idPivot, nodeIDs[j], true)
waitForCursors(t, sim, nodeIDs[j], idPivot, true)

idPivot := nodeIDs[0]
pivotSyncer := nodeRegistry(sim, idPivot)
pivotKademlia := nodeKademlia(sim, idPivot)
pivotDepth := uint(pivotKademlia.NeighbourhoodDepth())

for j := 2; j <= nodeCount; j++ {
// append a node to the simulation
id, err := sim.AddNodes(1)
if err != nil {
return err
}
err = sim.Net.ConnectNodesStar(id, nodeIDs[0])
if err != nil {
return err
}
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != j+1 {
return fmt.Errorf("not enough nodes up. got %d, want %d", len(nodeIDs), j)
}
time.Sleep(50 * time.Millisecond)
idPivot = nodeIDs[0]
for i := 1; i < j; i++ {
idOther := nodeIDs[i]
otherKademlia := sim.MustNodeItem(idOther, simulation.BucketKeyKademlia).(*network.Kademlia)
po := chunk.Proximity(otherKademlia.BaseAddr(), pivotKademlia.BaseAddr())
depth := pivotKademlia.NeighbourhoodDepth()
pivotCursors := pivotSyncer.getPeer(idOther).getCursorsCopy()

// check that the pivot node is interested just in bins >= depth
if po >= depth {
othersBins := nodeInitialBinIndexes(sim, idOther)
if err := compareNodeBinsToStreamsWithDepth(t, pivotCursors, othersBins, pivotDepth); err != nil {
return err
}
for i := 1; i < j; i++ {
idOther := nodeIDs[i]
otherKademlia := sim.MustNodeItem(idOther, simulation.BucketKeyKademlia).(*network.Kademlia)
po := chunk.Proximity(otherKademlia.BaseAddr(), pivotKademlia.BaseAddr())
pivotCursors := pivotSyncer.getPeer(idOther).getCursorsCopy()

// check that the pivot node is interested just in bins >= depth
if po >= int(pivotDepth) {
othersBins := nodeInitialBinIndexes(sim, idOther)
if err := compareNodeBinsToStreamsWithDepth(t, pivotCursors, othersBins, pivotDepth); err != nil {
t.Error(err)
}
}
}
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
}

Expand Down Expand Up @@ -424,6 +379,19 @@ func waitForCursors(t *testing.T, sim *simulation.Simulation, pivotEnode, lookup
}
}

// getCursorsCopy returns cursors on node idOne for its peer idOther.
func getCursorsCopy(sim *simulation.Simulation, idOne, idOther enode.ID) map[string]uint64 {
r := nodeRegistry(sim, idOne)
if r == nil {
return nil
}
p := r.getPeer(idOther)
if p == nil {
return nil
}
return p.getCursorsCopy()
}

// compareNodeBinsToStreams checks that the values on `onesCursors` correlate to the values in `othersBins`
// onesCursors represents the stream cursors that node A knows about node B (i.e. they shoud reflect directly in this case
// the values which node B retrieved from its local store)
Expand Down