Skip to content
This repository has been archived by the owner on Dec 7, 2019. It is now read-only.

Commit

Permalink
Merge pull request #39 from libp2p/fix/tpt-tests-for-quic
Browse files Browse the repository at this point in the history
fix transport tests for quic
  • Loading branch information
Stebalien authored Feb 8, 2019
2 parents 8f3a996 + 22414bc commit f3b84a9
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 51 deletions.
100 changes: 68 additions & 32 deletions test/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,52 +277,83 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb tpt.Transport, maddr ma.Multia
defer l.Close()

count := 10000
go func() {
c, err := l.Accept()
checkErr(t, err)
stress := func() {
for i := 0; i < count; i++ {
s, err := c.OpenStream()
if err != nil {
panic(err)
}
fullClose(t, s)
}
}
workers := 5

go stress()
go stress()
go stress()
go stress()
go stress()
}()
var (
connA, connB tpt.Conn
)

b, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
accepted := make(chan error, 1)
go func() {
var err error
connA, err = l.Accept()
accepted <- err
}()
connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
checkErr(t, <-accepted)

time.Sleep(time.Millisecond * 50)
defer func() {
if connA != nil {
connA.Close()
}
if connB != nil {
connB.Close()
}
}()

recv := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
str, err := b.AcceptStream()
defer wg.Done()
for j := 0; j < workers; j++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < count; i++ {
s, err := connA.OpenStream()
if err != nil {
t.Error(err)
return
}
wg.Add(1)
go func() {
defer wg.Done()
fullClose(t, s)
}()
}
}()
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < count*workers; i++ {
str, err := connB.AcceptStream()
if err != nil {
break
}
wg.Add(1)
go func() {
recv <- struct{}{}
defer wg.Done()
fullClose(t, str)
}()
}
}()

limit := time.After(time.Second * 10)
for i := 0; i < count*5; i++ {
select {
case <-recv:
case <-limit:
t.Fatal("timed out receiving streams")
}
timeout := time.After(time.Second * 10)
done := make(chan struct{})

go func() {
wg.Wait()
close(done)
}()

select {
case <-timeout:
t.Fatal("timed out receiving streams")
case <-done:
}
}

Expand All @@ -339,9 +370,14 @@ func SubtestStreamReset(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr,
if err != nil {
panic(err)
}

// Some transports won't open the stream until we write. That's
// fine.
s.Write([]byte("foo"))

time.Sleep(time.Millisecond * 50)

_, err = s.Write([]byte("foo"))
_, err = s.Write([]byte("bar"))
if err == nil {
t.Error("should have failed to write")
}
Expand Down
62 changes: 43 additions & 19 deletions test/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"sync"
"testing"
Expand Down Expand Up @@ -63,52 +62,70 @@ func SubtestBasic(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA
}
defer list.Close()

done := make(chan struct{})
var (
connA, connB tpt.Conn
done = make(chan struct{})
)
defer func() {
<-done
if connA != nil {
connA.Close()
}
if connB != nil {
connB.Close()
}
}()

go func() {
defer close(done)
c, err := list.Accept()
var err error
connB, err = list.Accept()
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
s, err := c.AcceptStream()
s, err := connB.AcceptStream()
if err != nil {
c.Close()
t.Fatal(err)
t.Error(err)
return
}

buf := make([]byte, len(testData))
_, err = io.ReadFull(s, buf)
buf, err := ioutil.ReadAll(s)
if err != nil {
t.Fatal(err)
t.Error(err)
return
}

if !bytes.Equal(testData, buf) {
t.Errorf("expected %s, got %s", testData, buf)
}

n, err := s.Write(testData)
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
s.Close()

if n != len(testData) {
t.Fatal(err)
t.Error(err)
return
}

err = s.Close()
if err != nil {
t.Error(err)
}
}()

if !tb.CanDial(list.Multiaddr()) {
t.Error("CanDial should have returned true")
}

c, err := tb.Dial(ctx, list.Multiaddr(), peerA)
connA, err = tb.Dial(ctx, list.Multiaddr(), peerA)
if err != nil {
t.Fatal(err)
}
defer c.Close()

s, err := c.OpenStream()
s, err := connA.OpenStream()
if err != nil {
t.Fatal(err)
}
Expand All @@ -123,13 +140,20 @@ func SubtestBasic(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA
t.Fatalf("failed to write enough data (a->b)")
return
}
err = s.Close()
if err != nil {
t.Fatal(err)
return
}

buf := make([]byte, len(testData))
_, err = io.ReadFull(s, buf)
buf, err := ioutil.ReadAll(s)
if err != nil {
t.Fatal(err)
return
}
if !bytes.Equal(testData, buf) {
t.Errorf("expected %s, got %s", testData, buf)
}
}

func SubtestPingPong(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID) {
Expand Down

0 comments on commit f3b84a9

Please sign in to comment.