Skip to content

Commit

Permalink
feat: performance improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmcgee committed Oct 2, 2023
1 parent d856f53 commit 4be38ef
Show file tree
Hide file tree
Showing 7 changed files with 382 additions and 77 deletions.
6 changes: 3 additions & 3 deletions pkg/blob/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type service struct {

func (s *service) Stat(ctx context.Context, request *capb.StatBlobRequest) (*capb.BlobMeta, error) {
digest := store.Digest(request.Digest)
ok, err := s.store.Stat(digest.String(), ctx)
ok, err := s.store.Stat(digest, ctx)
if err != nil {
return nil, err
}
Expand All @@ -62,7 +62,7 @@ func (s *service) Read(request *capb.ReadBlobRequest, server capb.BlobService_Re
defer cancel()

digest := store.Digest(request.Digest)
reader, err := s.store.Get(digest.String(), ctx)
reader, err := s.store.Get(digest, ctx)

if err == store.ErrKeyNotFound {
return status.Errorf(codes.NotFound, "blob not found: %v", digest)
Expand All @@ -72,7 +72,7 @@ func (s *service) Read(request *capb.ReadBlobRequest, server capb.BlobService_Re
}

// we want to stay just under the 4MB max size restriction in gRPC
sendBuf := make([]byte, (4*1024*1024)-1024)
sendBuf := make([]byte, (4*1024*1024)-5)

for {
n, err := reader.Read(sendBuf)
Expand Down
77 changes: 77 additions & 0 deletions pkg/blob/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var sizes = []bytesize.ByteSize{
512 << 10,
1 << 20,
4 << 20,
8 << 20,
16 << 20,
32 << 20,
64 << 20,
Expand Down Expand Up @@ -117,6 +118,82 @@ func BenchmarkBlobService_Put(b *testing.B) {
}
}

func BenchmarkBlobService_Read(b *testing.B) {
s := test.RunBasicJetStreamServer(b)
defer test.ShutdownJSServerAndRemoveStorage(b, s)

srv, lis := blobServer(s, b)
defer srv.Stop()

conn := test.GrpcConn(lis, b)
client := pb.NewBlobServiceClient(conn)

for _, size := range sizes {
size := size

rng := rand.New(rand.NewSource(1))
data := make([]byte, size)
rng.Read(data)

r := bytes.NewReader(data)

put, err := client.Put(context.Background())
if err != nil {
b.Fatal(err)
}

sendBuf := make([]byte, (16*1024*1024)-5)
for {
if n, err := r.Read(sendBuf); err != nil {
if err == io.EOF {
break
} else {
b.Fatal(err)
}
} else if err = put.Send(&pb.BlobChunk{Data: sendBuf[:n]}); err != nil {
b.Fatal(err)
}
}

resp, err := put.CloseAndRecv()
if err != nil {
b.Fatal(err)
}

b.Run(size.String(), func(b *testing.B) {
b.SetBytes(int64(size))
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf := bytes.NewBuffer(nil)

read, err := client.Read(context.Background(), &pb.ReadBlobRequest{Digest: resp.Digest})
if err != nil {
b.Fatal(err)
}

for {
chunk, err := read.Recv()
if err == io.EOF {
break
} else if err != nil {
b.Fatal(err)
}
_, err = buf.Write(chunk.Data)
if err != nil {
b.Fatal(err)
}
}

if buf.Len() != len(data) {
b.Fatalf("Received %v bytes, expected %v", buf.Len(), len(data))
}
}
})
}
}

func TestBlobService_Put(t *testing.T) {
s := test.RunBasicJetStreamServer(t)
defer test.ShutdownJSServerAndRemoveStorage(t, s)
Expand Down
55 changes: 44 additions & 11 deletions pkg/store/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,13 @@ func (c *CachingStore) Stat(key string, ctx context.Context) (ok bool, err error
func (c *CachingStore) Get(key string, ctx context.Context) (reader io.ReadCloser, err error) {
// try in Memory store first
reader, err = c.Memory.Get(key, ctx)
if err == ErrKeyNotFound {
log.Debug("cache miss", "key", key)
// fallback and try the Disk based store last
reader, err = c.Disk.Get(key, ctx)
if err == nil {
reader = cacheWriter{
store: c.Memory,
key: key,
reader: reader,
buf: bytes.NewBuffer(nil),
}
if err == nil {
reader = &cacheReader{
key: key,
disk: c.Disk,
memory: c.Memory,
ctx: ctx,
reader: reader,
}
}
return
Expand Down Expand Up @@ -82,3 +78,40 @@ func (c cacheWriter) Close() (err error) {
}
return
}

type cacheReader struct {
key string
disk Store
memory Store
ctx context.Context

reader io.ReadCloser
faulted bool
}

func (c *cacheReader) Read(p []byte) (n int, err error) {
n, err = c.reader.Read(p)
if err == ErrKeyNotFound && !c.faulted {
_ = c.reader.Close()

log.Debug("cache miss", "key", c.key)
// fallback and try the Disk based store last
c.reader, err = c.disk.Get(c.key, c.ctx)
if err == nil {
c.reader = cacheWriter{
store: c.memory,
key: c.key,
reader: c.reader,
buf: bytes.NewBuffer(nil),
}
}

c.faulted = true
n, err = c.reader.Read(p)
}
return
}

func (c *cacheReader) Close() error {
return c.reader.Close()
}
125 changes: 86 additions & 39 deletions pkg/store/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"context"
"io"

"golang.org/x/sync/errgroup"

"github.com/nats-io/nats.go"

"github.com/SaveTheRbtz/fastcdc-go"
pb "github.com/brianmcgee/nvix/protos"
"github.com/charmbracelet/log"
"github.com/djherbis/buffer"
"github.com/djherbis/nio/v3"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"

Expand Down Expand Up @@ -39,7 +39,10 @@ func (c *CdcStore) getMeta(key string, ctx context.Context) (*pb.BlobMeta, error
}()

b, err := io.ReadAll(reader)
if err != nil {

if err == ErrKeyNotFound {
return nil, err
} else if err != nil {
return nil, errors.Annotate(err, "failed to read bytes")
}

Expand All @@ -50,45 +53,17 @@ func (c *CdcStore) getMeta(key string, ctx context.Context) (*pb.BlobMeta, error
return &meta, nil
}

func (c *CdcStore) Stat(key string, ctx context.Context) (ok bool, err error) {
return c.Meta.Stat(key, ctx)
func (c *CdcStore) Stat(digest Digest, ctx context.Context) (ok bool, err error) {
return c.Meta.Stat(digest.String(), ctx)
}

func (c *CdcStore) Get(key string, ctx context.Context) (io.ReadCloser, error) {
meta, err := c.getMeta(key, ctx)
func (c *CdcStore) Get(digest Digest, ctx context.Context) (io.ReadCloser, error) {
meta, err := c.getMeta(digest.String(), ctx)
if err != nil {
return nil, err
}

// create a buffer with an average of 2 chunks
buf := buffer.New(int64(ChunkOptions.AverageSize))
r, w := nio.Pipe(buf)

go c.readChunks(meta, w, ctx)

return r, nil
}

func (c *CdcStore) readChunks(meta *pb.BlobMeta, writer *nio.PipeWriter, ctx context.Context) {
var err error
var reader io.Reader

defer func() {
_ = writer.CloseWithError(err)
}()

for _, chunk := range meta.Chunks {
digest := Digest(chunk.Digest)
if reader, err = c.Chunks.Get(digest.String(), ctx); err != nil {
log.Error("failed to retrieve chunk", "digest", digest, "error", err)
return
} else if _, err = io.Copy(writer, reader); err != nil {
log.Error("failed to copy chunk", "digest", digest, "error", err)
return
}
}

return
return &blobReader{blob: meta, store: c.Chunks, ctx: ctx}, nil
}

func (c *CdcStore) Put(reader io.ReadCloser, ctx context.Context) (*Digest, error) {
Expand Down Expand Up @@ -160,13 +135,13 @@ func (c *CdcStore) Put(reader io.ReadCloser, ctx context.Context) (*Digest, erro
return &blobDigest, nil
}

func (c *CdcStore) Delete(key string, ctx context.Context) error {
meta, err := c.getMeta(key, ctx)
func (c *CdcStore) Delete(digest Digest, ctx context.Context) error {
meta, err := c.getMeta(digest.String(), ctx)
if err != nil {
return err
}

if err = c.Meta.Delete(key, ctx); err != nil {
if err = c.Meta.Delete(digest.String(), ctx); err != nil {
return errors.Annotate(err, "failed to delete metadata entry")
}

Expand All @@ -180,3 +155,75 @@ func (c *CdcStore) Delete(key string, ctx context.Context) error {

return nil
}

type blobReader struct {
blob *pb.BlobMeta
store Store

eg *errgroup.Group
ctx context.Context
readers chan io.ReadCloser

reader io.ReadCloser
}

func (c *blobReader) Read(p []byte) (n int, err error) {
if c.eg == nil {
var ctx context.Context
c.eg, ctx = errgroup.WithContext(c.ctx)

c.readers = make(chan io.ReadCloser, 2)

c.eg.Go(func() error {
// close channel on return
defer close(c.readers)

b := make([]byte, 0)

for _, chunk := range c.blob.Chunks {
r, err := c.store.Get(Digest(chunk.Digest).String(), ctx)
if err != nil {
return err
}

// tickle the reader to force it to fetch the underlying message and is ready for reading
_, err = r.Read(b)
if err != nil {
return err
}
c.readers <- r
}
return nil
})
}

for {
if c.reader == nil {
var ok bool
c.reader, ok = <-c.readers
if !ok {
// channel has been closed
err = c.eg.Wait()
if err == nil {
err = io.EOF
}
return
}
}

n, err = c.reader.Read(p)
if err == io.EOF {
if err = c.Close(); err != nil {
return
}
c.reader = nil
} else {
return
}
}
}

func (c *blobReader) Close() error {
// do nothing
return nil
}
Loading

0 comments on commit 4be38ef

Please sign in to comment.