Skip to content

Commit

Permalink
feat: run benchmarks in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmcgee committed Oct 2, 2023
1 parent 4be38ef commit 53e7f3f
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 85 deletions.
92 changes: 48 additions & 44 deletions pkg/blob/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ var sizes = []bytesize.ByteSize{
32 << 20,
64 << 20,
128 << 20,
256 << 20,
512 << 20,
1 << 30,
}

func blobServer(s *server.Server, t test.TestingT) (*grpc.Server, net.Listener) {
Expand Down Expand Up @@ -78,42 +78,44 @@ func BenchmarkBlobService_Put(b *testing.B) {
for _, size := range sizes {
size := size
b.Run(size.String(), func(b *testing.B) {
rng := rand.New(rand.NewSource(1))
data := make([]byte, size)
rng.Read(data)

r := bytes.NewReader(data)
b.SetBytes(int64(size))
b.ReportAllocs()
b.ResetTimer()

sendBuf := make([]byte, (16*1024*1024)-5)
b.RunParallel(func(p *testing.PB) {
rng := rand.New(rand.NewSource(1))
data := make([]byte, size)
rng.Read(data)

for i := 0; i < b.N; i++ {
r.Reset(data)
r := bytes.NewReader(data)

put, err := client.Put(context.Background())
if err != nil {
b.Fatal(err)
}
sendBuf := make([]byte, (16*1024*1024)-5)

for {
if n, err := r.Read(sendBuf); err != nil {
if err == io.EOF {
break
} else {
for p.Next() {
r.Reset(data)

put, err := client.Put(context.Background())
if err != nil {
b.Fatal(err)
}

for {
if n, err := r.Read(sendBuf); err != nil {
if err == io.EOF {
break
} else {
b.Fatal(err)
}
} else if err = put.Send(&pb.BlobChunk{Data: sendBuf[:n]}); err != nil {
b.Fatal(err)
}
} else if err = put.Send(&pb.BlobChunk{Data: sendBuf[:n]}); err != nil {
b.Fatal(err)
}
}

if _, err = put.CloseAndRecv(); err != nil {
b.Fatal(err)
if _, err = put.CloseAndRecv(); err != nil {
b.Fatal(err)
}
}

}
})
})
}
}
Expand Down Expand Up @@ -165,31 +167,33 @@ func BenchmarkBlobService_Read(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf := bytes.NewBuffer(nil)
b.RunParallel(func(p *testing.PB) {
for p.Next() {
buf := bytes.NewBuffer(nil)

read, err := client.Read(context.Background(), &pb.ReadBlobRequest{Digest: resp.Digest})
if err != nil {
b.Fatal(err)
}

for {
chunk, err := read.Recv()
if err == io.EOF {
break
} else if err != nil {
b.Fatal(err)
}
_, err = buf.Write(chunk.Data)
read, err := client.Read(context.Background(), &pb.ReadBlobRequest{Digest: resp.Digest})
if err != nil {
b.Fatal(err)
}
}

if buf.Len() != len(data) {
b.Fatalf("Received %v bytes, expected %v", buf.Len(), len(data))
for {
chunk, err := read.Recv()
if err == io.EOF {
break
} else if err != nil {
b.Fatal(err)
}
_, err = buf.Write(chunk.Data)
if err != nil {
b.Fatal(err)
}
}

if buf.Len() != len(data) {
b.Fatalf("Received %v bytes, expected %v", buf.Len(), len(data))
}
}
}
})
})
}
}
Expand Down
66 changes: 35 additions & 31 deletions pkg/store/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ var sizes = []bytesize.ByteSize{
32 << 20,
64 << 20,
128 << 20,
512 << 20,
1 << 30,
256 << 20,
}

func BenchmarkCdcStore_Put(b *testing.B) {
Expand All @@ -116,21 +115,24 @@ func BenchmarkCdcStore_Put(b *testing.B) {
for _, size := range sizes {
size := size
b.Run(size.String(), func(b *testing.B) {
rng := rand.New(rand.NewSource(1))
data := make([]byte, size)
rng.Read(data)

r := bytes.NewReader(data)
b.SetBytes(int64(size))
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
r.Reset(data)
if _, err := store.Put(io.NopCloser(r), context.Background()); err != nil {
b.Fatal(err)
b.RunParallel(func(pb *testing.PB) {
rng := rand.New(rand.NewSource(1))
data := make([]byte, size)
rng.Read(data)

r := bytes.NewReader(data)

for pb.Next() {
r.Reset(data)
if _, err := store.Put(io.NopCloser(r), context.Background()); err != nil {
b.Fatal(err)
}
}
}
})
})
}
}
Expand Down Expand Up @@ -167,26 +169,28 @@ func BenchmarkCdcStore_Get(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
reader, err := store.Get(*digest, context.Background())
if err != nil {
b.Fatal(err)
}

getData, err := io.ReadAll(reader)
if err != nil {
b.Fatal(err)
}

err = reader.Close()
if err != nil {
b.Fatal(err)
}

if len(getData) != len(data) {
b.Fatalf("expected %v bytes, received %v", len(data), len(getData))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reader, err := store.Get(*digest, context.Background())
if err != nil {
b.Fatal(err)
}

getData, err := io.ReadAll(reader)
if err != nil {
b.Fatal(err)
}

err = reader.Close()
if err != nil {
b.Fatal(err)
}

if len(getData) != len(data) {
b.Fatalf("expected %v bytes, received %v", len(data), len(getData))
}
}
}
})
})
}
}
108 changes: 98 additions & 10 deletions pkg/store/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"math/rand"
"testing"

"github.com/nats-io/nuid"

"github.com/brianmcgee/nvix/pkg/test"
"github.com/inhies/go-bytesize"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -73,22 +75,108 @@ func BenchmarkNatsStore_Put(b *testing.B) {
for _, size := range natsStoreSizes {
size := size
b.Run(fmt.Sprintf("%s-%v", streamConfig.Name, size), func(b *testing.B) {
rng := rand.New(rand.NewSource(1))
data := make([]byte, size)
rng.Read(data)
b.SetBytes(int64(size))
b.ReportAllocs()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
rng := rand.New(rand.NewSource(1))
data := make([]byte, size)
rng.Read(data)

r := bytes.NewReader(data)

for pb.Next() {
r.Reset(data)
if err := store.Put(nuid.Next(), io.NopCloser(r), context.Background()); err != nil {
b.Fatal(err)
}
}
})
})
}
}
}

r := bytes.NewReader(data)
func BenchmarkNatsStore_Get(b *testing.B) {
s := test.RunBasicJetStreamServer(b)
defer test.ShutdownJSServerAndRemoveStorage(b, s)

conn, js := test.JsClient(b, s)

js, err := conn.JetStream()
if err != nil {
b.Fatal(err)
}

storageTypes := []nats.StorageType{
nats.FileStorage,
nats.MemoryStorage,
}

streamConfig := nats.StreamConfig{
Replicas: 1,
Discard: nats.DiscardOld,
MaxMsgsPerSubject: 1,
Storage: nats.FileStorage,
AllowRollup: true,
AllowDirect: true,
}

for _, storage := range storageTypes {

subjectPrefix := fmt.Sprintf("STORE.%v", storage)

streamConfig.Name = storage.String()
streamConfig.Subjects = []string{subjectPrefix + ".*"}
streamConfig.Storage = storage

if _, err := js.AddStream(&streamConfig); err != nil {
b.Fatal(err)
}

store := NatsStore{
Conn: conn,
StreamConfig: &streamConfig,
SubjectPrefix: subjectPrefix,
}

for _, size := range natsStoreSizes {
size := size

rng := rand.New(rand.NewSource(1))
data := make([]byte, size)
rng.Read(data)

r := bytes.NewReader(data)

key := fmt.Sprintf("key-%d", int(size))
if err := store.Put(key, io.NopCloser(r), context.Background()); err != nil {
b.Fatal(err)
}

b.Run(fmt.Sprintf("%s-%v", streamConfig.Name, size), func(b *testing.B) {
b.SetBytes(int64(size))
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
r.Reset(data)
key := fmt.Sprintf("%d-key-%d", int(size), i)
if err := store.Put(key, io.NopCloser(r), context.Background()); err != nil {
b.Fatal(err)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
reader, err := store.Get(key, context.Background())
if err != nil {
b.Fatal(err)
}

getData, err := io.ReadAll(reader)
if err != nil {
b.Fatal(err)
}

if len(getData) != len(data) {
b.Fatalf("expected %d bytes, received %b", len(data), len(getData))
}
}
}
})
})
}
}
Expand Down

0 comments on commit 53e7f3f

Please sign in to comment.