From 38d46a0df9b5644664ae0809d21fd92238c996fc Mon Sep 17 00:00:00 2001 From: Brian McGee Date: Mon, 9 Oct 2023 10:11:45 +0100 Subject: [PATCH] feat: implement directory service --- flake.lock | 32 +++--- internal/cli/cli.go | 4 +- internal/cli/store/run.go | 20 +++- nix/dev/tvix.nix | 3 +- pkg/{store => blob}/config.go | 23 ++-- pkg/blob/grpc.go | 23 ++-- pkg/directory/config.go | 66 +++++++++++ pkg/directory/directory.go | 87 ++++++++++++++ pkg/directory/grpc.go | 209 ++++++++++++++++++++++++++++++++++ pkg/store/cdc_test.go | 92 ++++++++++++++- pkg/store/types.go | 4 +- 11 files changed, 517 insertions(+), 46 deletions(-) rename pkg/{store => blob}/config.go (83%) create mode 100644 pkg/directory/config.go create mode 100644 pkg/directory/directory.go create mode 100644 pkg/directory/grpc.go diff --git a/flake.lock b/flake.lock index 5675ef3..ebc7544 100644 --- a/flake.lock +++ b/flake.lock @@ -3,11 +3,11 @@ "depot": { "flake": false, "locked": { - "lastModified": 1696110457, - "narHash": "sha256-/Iynr6hKA8AYY2xQkr9xwAUFe/3Bh39xEC9a7JFPNao=", + "lastModified": 1696794325, + "narHash": "sha256-diTMU+qDtsDmb4CkzPqJAbXcl+IfR98iw8pp5ceDjSc=", "ref": "refs/heads/canon", - "rev": "c5cb622d024cbda87711d83f8ad1214312d397e4", - "revCount": 18825, + "rev": "f4787355a4a7e172c4ba77d160d7a21a5b8032e6", + "revCount": 18878, "type": "git", "url": "https://cl.tvl.fyi/depot" }, @@ -40,11 +40,11 @@ "flake-compat": { "flake": false, "locked": { - "lastModified": 1673956053, - "narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=", + "lastModified": 1696426674, + "narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=", "owner": "edolstra", "repo": "flake-compat", - "rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9", + "rev": "0f9255e01c2351cc7d116c072cb317785dd33b33", "type": "github" }, "original": { @@ -58,11 +58,11 @@ "nixpkgs-lib": "nixpkgs-lib" }, "locked": { - "lastModified": 1693611461, - "narHash": "sha256-aPODl8vAgGQ0ZYFIRisxYG5MOGSkIczvu2Cd8Gb9+1Y=", + "lastModified": 1696343447, + "narHash": "sha256-B2xAZKLkkeRFG5XcHHSXXcP7To9Xzr59KXeZiRf4vdQ=", "owner": "hercules-ci", "repo": "flake-parts", - "rev": "7f53fdb7bdc5bb237da7fefef12d099e4fd611ca", + "rev": "c9afaba3dfa4085dbd2ccb38dfade5141e33d9d4", "type": "github" }, "original": { @@ -88,11 +88,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1695806987, - "narHash": "sha256-fX5kGs66NZIxCMcpAGIpxuftajHL8Hil1vjHmjjl118=", + "lastModified": 1696725822, + "narHash": "sha256-B7uAOS7TkLlOg1aX01rQlYbydcyB6ZnLJSfaYbKVww8=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "f3dab3509afca932f3f4fd0908957709bb1c1f57", + "rev": "5aabb5780a11c500981993d49ee93cfa6df9307b", "type": "github" }, "original": { @@ -105,11 +105,11 @@ "nixpkgs-lib": { "locked": { "dir": "lib", - "lastModified": 1693471703, - "narHash": "sha256-0l03ZBL8P1P6z8MaSDS/MvuU8E75rVxe5eE1N6gxeTo=", + "lastModified": 1696019113, + "narHash": "sha256-X3+DKYWJm93DRSdC5M6K5hLqzSya9BjibtBsuARoPco=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "3e52e76b70d5508f3cec70b882a29199f4d1ee85", + "rev": "f5892ddac112a1e9b3612c39af1b72987ee5783a", "type": "github" }, "original": { diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 545dcd4..3bb25e3 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -1,6 +1,8 @@ package cli -import "github.com/brianmcgee/nvix/internal/cli/store" +import ( + "github.com/brianmcgee/nvix/internal/cli/store" +) var Cli struct { Store store.Cli `cmd:""` diff --git a/internal/cli/store/run.go b/internal/cli/store/run.go index 37a9218..df93b62 100644 --- a/internal/cli/store/run.go +++ b/internal/cli/store/run.go @@ -6,6 +6,8 @@ import ( "runtime/debug" "syscall" + "github.com/brianmcgee/nvix/pkg/directory" + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -28,13 +30,15 @@ import ( type Run struct { NatsUrl string `short:"n" env:"NVIX_STORE_NATS_URL" default:"nats://localhost:4222"` - NatsCredentials string `short:"c" env:"NVIX_STORE_NATS_CREDENTIALS_FILE" required:""` + NatsCredentials string `short:"c" env:"NVIX_STORE_NATS_CREDENTIALS_FILE" required:"" type:"path"` ListenAddr string `short:"l" env:"NVIX_STORE_LISTEN_ADDR" default:"localhost:5000"` MetricsAddr string `short:"m" env:"NVIX_STORE_METRICS_ADDR" default:"localhost:5050"` } func (r *Run) Run() error { + log.SetLevel(log.DebugLevel) + log.Debug("connecting to NATS", "url", r.NatsUrl, "creds", r.NatsCredentials) conn, err := nats.Connect(r.NatsUrl, nats.UserCredentials(r.NatsCredentials)) @@ -42,9 +46,14 @@ func (r *Run) Run() error { log.Fatalf("failed to connect to nats: %v", err) } - service, err := blob.NewService(conn) + blobService, err := blob.NewService(conn) + if err != nil { + log.Fatalf("failed to create blob service: %v", err) + } + + directoryService, err := directory.NewService(conn) if err != nil { - log.Fatalf("failed to create blob service") + log.Fatalf("failed to create directory service: %v", err) } // setup metrics @@ -66,7 +75,7 @@ func (r *Run) Run() error { }) grpcPanicRecoveryHandler := func(p any) (err error) { panicsTotal.Inc() - rpcLogger.Error("recovered from panic", "panic", p, "stack", debug.Stack()) + rpcLogger.Error("recovered from panic", "panic", p, "stack", string(debug.Stack())) return status.Errorf(codes.Internal, "%s", p) } @@ -86,7 +95,8 @@ func (r *Run) Run() error { } grpcServer := grpc.NewServer(opts...) - pb.RegisterBlobServiceServer(grpcServer, service) + pb.RegisterBlobServiceServer(grpcServer, blobService) + pb.RegisterDirectoryServiceServer(grpcServer, directoryService) srvMetrics.InitializeMetrics(grpcServer) diff --git a/nix/dev/tvix.nix b/nix/dev/tvix.nix index bb943ab..b55dfed 100644 --- a/nix/dev/tvix.nix +++ b/nix/dev/tvix.nix @@ -25,7 +25,8 @@ } { name = "DIRECTORY_SERVICE_ADDR"; - eval = "sled://$TVIX_HOME/store/directory"; + value = "grpc+http://localhost:5000"; + # eval = "sled://$TVIX_HOME/store/directory"; } { name = "TVIX_MOUNT_DIR"; diff --git a/pkg/store/config.go b/pkg/blob/config.go similarity index 83% rename from pkg/store/config.go rename to pkg/blob/config.go index 3eeaf42..7f40de6 100644 --- a/pkg/store/config.go +++ b/pkg/blob/config.go @@ -1,13 +1,14 @@ -package store +package blob import ( + "github.com/brianmcgee/nvix/pkg/store" "github.com/brianmcgee/nvix/pkg/subject" "github.com/nats-io/nats.go" ) var ( DiskBasedStreamConfig = nats.StreamConfig{ - Name: "store", + Name: "blob_store", Subjects: []string{ subject.WithPrefix("STORE.BLOB.*"), subject.WithPrefix("STORE.CHUNK.*"), @@ -27,7 +28,7 @@ var ( } MemoryBasedStreamConfig = nats.StreamConfig{ - Name: "cache", + Name: "blob_cache", Subjects: []string{ subject.WithPrefix("CACHE.BLOB.*"), subject.WithPrefix("CACHE.CHUNK.*"), @@ -41,51 +42,51 @@ var ( } ) -func NewChunkStore(conn *nats.Conn) Store { +func NewChunkStore(conn *nats.Conn) store.Store { diskPrefix := DiskBasedStreamConfig.Subjects[1] diskPrefix = diskPrefix[:len(diskPrefix)-2] memoryPrefix := MemoryBasedStreamConfig.Subjects[1] memoryPrefix = memoryPrefix[:len(memoryPrefix)-2] - disk := &NatsStore{ + disk := &store.NatsStore{ Conn: conn, StreamConfig: &DiskBasedStreamConfig, SubjectPrefix: diskPrefix, } - memory := &NatsStore{ + memory := &store.NatsStore{ Conn: conn, StreamConfig: &MemoryBasedStreamConfig, SubjectPrefix: memoryPrefix, } - return &CachingStore{ + return &store.CachingStore{ Disk: disk, Memory: memory, } } -func NewMetaStore(conn *nats.Conn) Store { +func NewMetaStore(conn *nats.Conn) store.Store { diskPrefix := DiskBasedStreamConfig.Subjects[0] diskPrefix = diskPrefix[:len(diskPrefix)-2] memoryPrefix := MemoryBasedStreamConfig.Subjects[0] memoryPrefix = memoryPrefix[:len(memoryPrefix)-2] - disk := &NatsStore{ + disk := &store.NatsStore{ Conn: conn, StreamConfig: &DiskBasedStreamConfig, SubjectPrefix: diskPrefix, } - memory := &NatsStore{ + memory := &store.NatsStore{ Conn: conn, StreamConfig: &MemoryBasedStreamConfig, SubjectPrefix: memoryPrefix, } - return &CachingStore{ + return &store.CachingStore{ Disk: disk, Memory: memory, } diff --git a/pkg/blob/grpc.go b/pkg/blob/grpc.go index 64059d7..1c1ee6c 100644 --- a/pkg/blob/grpc.go +++ b/pkg/blob/grpc.go @@ -20,19 +20,19 @@ func NewService(conn *nats.Conn) (capb.BlobServiceServer, error) { return nil, errors.Annotate(err, "failed to create a JetStream context") } - if _, err := js.AddStream(&store.DiskBasedStreamConfig); err != nil { + if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil { return nil, errors.Annotate(err, "failed to create disk based stream") } - if _, err := js.AddStream(&store.MemoryBasedStreamConfig); err != nil { + if _, err := js.AddStream(&MemoryBasedStreamConfig); err != nil { return nil, errors.Annotate(err, "failed to create memory based stream") } return &service{ conn: conn, store: &store.CdcStore{ - Meta: store.NewMetaStore(conn), - Chunks: store.NewChunkStore(conn), + Meta: NewMetaStore(conn), + Chunks: NewChunkStore(conn), }, }, nil } @@ -45,6 +45,9 @@ type service struct { } func (s *service) Stat(ctx context.Context, request *capb.StatBlobRequest) (*capb.BlobMeta, error) { + l := log.WithPrefix("blob.stat") + l.Debug("executing", "digest", store.Digest(request.GetDigest())) + digest := store.Digest(request.Digest) ok, err := s.store.Stat(digest, ctx) if err != nil { @@ -58,6 +61,8 @@ func (s *service) Stat(ctx context.Context, request *capb.StatBlobRequest) (*cap } func (s *service) Read(request *capb.ReadBlobRequest, server capb.BlobService_ReadServer) error { + l := log.WithPrefix("blob.read") + ctx, cancel := context.WithCancel(server.Context()) defer cancel() @@ -67,7 +72,7 @@ func (s *service) Read(request *capb.ReadBlobRequest, server capb.BlobService_Re if err == store.ErrKeyNotFound { return status.Errorf(codes.NotFound, "blob not found: %v", digest) } else if err != nil { - log.Error("failed to get blob", "digest", digest, "error", err) + l.Error("failed to get blob", "digest", digest, "error", err) return status.Error(codes.Internal, "internal error") } @@ -80,14 +85,14 @@ func (s *service) Read(request *capb.ReadBlobRequest, server capb.BlobService_Re _ = reader.Close() break } else if err != nil { - log.Errorf("failed to read next chunk: %v", err) + l.Errorf("failed to read next chunk: %v", err) return status.Error(codes.Internal, "internal error") } if err = server.Send(&capb.BlobChunk{ Data: sendBuf[:n], }); err != nil { - log.Errorf("failed to send blob chunk to client: %v", err) + l.Errorf("failed to send blob chunk to client: %v", err) return err } } @@ -96,6 +101,8 @@ func (s *service) Read(request *capb.ReadBlobRequest, server capb.BlobService_Re } func (s *service) Put(server capb.BlobService_PutServer) (err error) { + l := log.WithPrefix("blob.put") + ctx, cancel := context.WithCancel(server.Context()) defer cancel() @@ -103,7 +110,7 @@ func (s *service) Put(server capb.BlobService_PutServer) (err error) { digest, err := s.store.Put(&reader, ctx) if err != nil { - log.Error("failed to put blob", "error", err) + l.Error("failed to put blob", "error", err) return status.Error(codes.Internal, "internal error") } diff --git a/pkg/directory/config.go b/pkg/directory/config.go new file mode 100644 index 0000000..6bb08ee --- /dev/null +++ b/pkg/directory/config.go @@ -0,0 +1,66 @@ +package directory + +import ( + "github.com/brianmcgee/nvix/pkg/store" + "github.com/brianmcgee/nvix/pkg/subject" + "github.com/nats-io/nats.go" +) + +var ( + DiskBasedStreamConfig = nats.StreamConfig{ + Name: "directory_store", + Subjects: []string{ + subject.WithPrefix("STORE.DIRECTORY.*"), + }, + Replicas: 1, + Discard: nats.DiscardOld, + MaxMsgsPerSubject: 1, + Storage: nats.FileStorage, + AllowRollup: true, + AllowDirect: true, + Compression: nats.S2Compression, + // automatically publish into the cache topic + RePublish: &nats.RePublish{ + Source: subject.WithPrefix("STORE.DIRECTORY.*"), + Destination: subject.WithPrefix("CACHE.DIRECTORY.{{wildcard(1)}}"), + }, + } + + MemoryBasedStreamConfig = nats.StreamConfig{ + Name: "directory_cache", + Subjects: []string{ + subject.WithPrefix("CACHE.DIRECTORY.*"), + }, + Replicas: 1, + Discard: nats.DiscardOld, + MaxMsgsPerSubject: 1, + Storage: nats.MemoryStorage, + AllowRollup: true, + AllowDirect: true, + } +) + +func NewDirectoryStore(conn *nats.Conn) store.Store { + diskPrefix := DiskBasedStreamConfig.Subjects[0] + diskPrefix = diskPrefix[:len(diskPrefix)-2] + + memoryPrefix := MemoryBasedStreamConfig.Subjects[0] + memoryPrefix = memoryPrefix[:len(memoryPrefix)-2] + + disk := &store.NatsStore{ + Conn: conn, + StreamConfig: &DiskBasedStreamConfig, + SubjectPrefix: diskPrefix, + } + + memory := &store.NatsStore{ + Conn: conn, + StreamConfig: &MemoryBasedStreamConfig, + SubjectPrefix: memoryPrefix, + } + + return &store.CachingStore{ + Disk: disk, + Memory: memory, + } +} diff --git a/pkg/directory/directory.go b/pkg/directory/directory.go new file mode 100644 index 0000000..0672c4b --- /dev/null +++ b/pkg/directory/directory.go @@ -0,0 +1,87 @@ +package directory + +import ( + "bytes" + "strings" + + capb "code.tvl.fyi/tvix/castore/protos" + "github.com/juju/errors" +) + +const ( + selfReference = "." + parentReference = ".." + + ErrEmptyName = errors.ConstError("name cannot be an empty string") + ErrNameWithSlash = errors.ConstError("name cannot contain slashes: '/'") + ErrNameWithNullByte = errors.ConstError("name cannot contain null bytes") + ErrNameWithSelfReference = errors.ConstError("name cannot be a self reference: '.'") + ErrNameWithParentReference = errors.ConstError("name cannot be a parent reference: '..'") + ErrNamesAreNotSorted = errors.ConstError("names must be lexicographically sorted") +) + +type DirEntry interface { + GetName() []byte +} + +func validateDirectory(directory *capb.Directory) error { + cache := make(map[string]DirEntry) + + validate := func(name string, entry DirEntry) error { + if err := validateName(name); err != nil { + return err + } else if _, ok := cache[name]; ok { + return errors.Errorf("duplicate name: %v", name) + } + cache[name] = entry + return nil + } + + var lastName []byte + for _, dir := range directory.Directories { + if err := validate(string(dir.Name), dir); err != nil { + return err + } else if len(lastName) > 0 && bytes.Compare(lastName, dir.Name) > 0 { + return ErrNamesAreNotSorted + } + lastName = dir.Name + } + + lastName = nil + for _, file := range directory.Files { + if err := validate(string(file.Name), file); err != nil { + return err + } else if len(lastName) > 0 && bytes.Compare(lastName, file.Name) > 0 { + return ErrNamesAreNotSorted + } + lastName = file.Name + } + + lastName = nil + for _, link := range directory.Symlinks { + if err := validate(string(link.Name), link); err != nil { + return err + } else if len(lastName) > 0 && bytes.Compare(lastName, link.Name) > 0 { + return ErrNamesAreNotSorted + } + lastName = link.Name + } + + return nil +} + +func validateName(name string) error { + if name == "" { + return ErrEmptyName + } else if name == selfReference { + return ErrNameWithSelfReference + } else if name == parentReference { + return ErrNameWithParentReference + } else if strings.Contains(name, "/") { + return ErrNameWithSlash + } else if strings.IndexByte(name, '\x00') > -1 { + return ErrNameWithNullByte + } + + return nil +} diff --git a/pkg/directory/grpc.go b/pkg/directory/grpc.go new file mode 100644 index 0000000..7a12563 --- /dev/null +++ b/pkg/directory/grpc.go @@ -0,0 +1,209 @@ +package directory + +import ( + "bytes" + "context" + "encoding/base64" + "io" + + capb "code.tvl.fyi/tvix/castore/protos" + + "github.com/brianmcgee/nvix/pkg/store" + "github.com/charmbracelet/log" + "github.com/golang/protobuf/proto" + "github.com/juju/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/nats-io/nats.go" +) + +func NewService(conn *nats.Conn) (capb.DirectoryServiceServer, error) { + js, err := conn.JetStream() + if err != nil { + return nil, errors.Annotate(err, "failed to create a JetStream context") + } + + if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil { + return nil, errors.Annotate(err, "failed to create disk based stream") + } + + if _, err := js.AddStream(&MemoryBasedStreamConfig); err != nil { + return nil, errors.Annotate(err, "failed to create memory based stream") + } + + return &service{ + conn: conn, + store: NewDirectoryStore(conn), + }, nil +} + +type service struct { + capb.UnimplementedDirectoryServiceServer + conn *nats.Conn + store store.Store +} + +// Get retrieves a stream of Directory messages, by using the lookup +// parameters in GetDirectoryRequest. +// Keep in mind multiple DirectoryNodes in different parts of the graph might +// have the same digest if they have the same underlying contents, +// so sending subsequent ones can be omitted. +func (s *service) Get(req *capb.GetDirectoryRequest, server capb.DirectoryService_GetServer) error { + l := log.WithPrefix("directory.get") + + rootDigest := store.Digest(req.GetDigest()) + l.Debug("request", "digest", rootDigest) + + ctx, cancel := context.WithCancel(server.Context()) + defer cancel() + + fetch := func(digest store.Digest) (*capb.Directory, error) { + reader, err := s.store.Get(digest.String(), ctx) + if err != nil { + l.Errorf("failure: %v", err) + return nil, status.Errorf(codes.NotFound, "digest not found: %v", digest) + } + defer func() { + _ = reader.Close() + }() + + b, err := io.ReadAll(reader) + if err != nil { + l.Errorf("failure: %v", err) + return nil, status.Error(codes.Internal, "failed to read directory entry from store") + } + var dir capb.Directory + if err = proto.Unmarshal(b, &dir); err != nil { + l.Errorf("failure: %v", err) + return nil, status.Error(codes.Internal, "failed to unmarshal directory entry from store") + } + return &dir, nil + } + + // todo handle get by what + + rootDirectory, err := fetch(rootDigest) + if err != nil { + l.Errorf("failure: %v", err) + return status.Errorf(codes.NotFound, "directory not found: %v", rootDigest) + } + + dirs := []*capb.Directory{rootDirectory} + + iterateDirs := func(directory *capb.Directory) error { + for _, dir := range directory.Directories { + digest := store.Digest(dir.Digest) + if d, err := fetch(digest); err != nil { + return err + } else { + dirs = append(dirs, d) + } + } + return nil + } + + for _, dir := range dirs { + if err = iterateDirs(dir); err != nil { + l.Errorf("failure: %v", err) + return status.Error(codes.Internal, "failed to iterate directories") + } else if err = server.Send(dir); err != nil { + l.Errorf("failure: %v", err) + return status.Error(codes.Internal, "failed to send directory") + } + // remove from head + dirs = dirs[1:] + } + + return nil +} + +// Put uploads a graph of Directory messages. +// Individual Directory messages need to be send in an order walking up +// from the leaves to the root - a Directory message can only refer to +// Directory messages previously sent in the same stream. +// Keep in mind multiple DirectoryNodes in different parts of the graph might +// have the same digest if they have the same underlying contents, +// so sending subsequent ones can be omitted. +// We might add a separate method, allowing to send partial graphs at a later +// time, if requiring to send the full graph turns out to be a problem. +func (s *service) Put(server capb.DirectoryService_PutServer) error { + l := log.WithPrefix("directory.put") + + ctx, cancel := context.WithCancel(server.Context()) + defer cancel() + + var rootDigest []byte + var futures []nats.PubAckFuture + + cache := make(map[string]*capb.Directory) + + for { + directory, err := server.Recv() + if err == io.EOF { + break + } else if err != nil { + l.Error("failed to receive directory", "err", err) + return status.Errorf(codes.Unknown, "failed to receive directory") + } + + if err := validateDirectory(directory); err != nil { + return status.Errorf(codes.InvalidArgument, "bad request: %v", err) + } + + digest, err := directory.Digest() + if err != nil { + return status.Error(codes.Unknown, "failed to generate directory digest") + } + + digestStr := base64.StdEncoding.EncodeToString(digest) + cache[digestStr] = directory + + for _, node := range directory.Directories { + target := base64.StdEncoding.EncodeToString(node.Digest) + if _, ok := cache[target]; !ok { + return status.Errorf(codes.InvalidArgument, "directory node refers to unknown directory digest: %v", target) + } + } + + b, err := proto.Marshal(directory) + if err != nil { + l.Error("failed to marshal directory", "err", err) + return status.Error(codes.Internal, "failed to marshal directory") + } + + future, err := s.store.PutAsync(digestStr, io.NopCloser(bytes.NewReader(b)), ctx) + if err != nil { + l.Error("failed to put directory in store", "err", err) + return status.Errorf(codes.Internal, "failed to put directory in store") + } + + rootDigest = digest + futures = append(futures, future) + } + + for _, f := range futures { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-f.Err(): + // TODO how to handle partial writes due to failure? + l.Error(codes.Internal, "put future has returned an error", "err", err) + return status.Errorf(codes.Internal, "failed to put directory in store") + case ack := <-f.Ok(): + l.Debug("put acknowledged", "ack", ack) + } + } + + l.Debug("all puts complete") + + resp := &capb.PutDirectoryResponse{ + RootDigest: rootDigest, + } + if err := server.SendAndClose(resp); err != nil { + l.Error("failed to send put response", "err", err) + } + + l.Debug("finished") + return nil +} diff --git a/pkg/store/cdc_test.go b/pkg/store/cdc_test.go index c0beac5..4ebd2fc 100644 --- a/pkg/store/cdc_test.go +++ b/pkg/store/cdc_test.go @@ -7,6 +7,8 @@ import ( "math/rand" "testing" + "github.com/brianmcgee/nvix/pkg/subject" + "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" @@ -16,6 +18,92 @@ import ( "github.com/inhies/go-bytesize" ) +var ( + DiskBasedStreamConfig = nats.StreamConfig{ + Name: "blob_store", + Subjects: []string{ + subject.WithPrefix("STORE.BLOB.*"), + subject.WithPrefix("STORE.CHUNK.*"), + }, + Replicas: 1, + Discard: nats.DiscardOld, + MaxMsgsPerSubject: 1, + Storage: nats.FileStorage, + AllowRollup: true, + AllowDirect: true, + Compression: nats.S2Compression, + // automatically publish into the cache topic + RePublish: &nats.RePublish{ + Source: subject.WithPrefix("STORE.*.*"), + Destination: subject.WithPrefix("CACHE.{{wildcard(1)}}.{{wildcard(2)}}"), + }, + } + + MemoryBasedStreamConfig = nats.StreamConfig{ + Name: "blob_cache", + Subjects: []string{ + subject.WithPrefix("CACHE.BLOB.*"), + subject.WithPrefix("CACHE.CHUNK.*"), + }, + Replicas: 1, + Discard: nats.DiscardOld, + MaxMsgsPerSubject: 1, + Storage: nats.MemoryStorage, + AllowRollup: true, + AllowDirect: true, + } +) + +func newChunkStore(conn *nats.Conn) Store { + diskPrefix := DiskBasedStreamConfig.Subjects[1] + diskPrefix = diskPrefix[:len(diskPrefix)-2] + + memoryPrefix := MemoryBasedStreamConfig.Subjects[1] + memoryPrefix = memoryPrefix[:len(memoryPrefix)-2] + + disk := &NatsStore{ + Conn: conn, + StreamConfig: &DiskBasedStreamConfig, + SubjectPrefix: diskPrefix, + } + + memory := &NatsStore{ + Conn: conn, + StreamConfig: &MemoryBasedStreamConfig, + SubjectPrefix: memoryPrefix, + } + + return &CachingStore{ + Disk: disk, + Memory: memory, + } +} + +func newMetaStore(conn *nats.Conn) Store { + diskPrefix := DiskBasedStreamConfig.Subjects[0] + diskPrefix = diskPrefix[:len(diskPrefix)-2] + + memoryPrefix := MemoryBasedStreamConfig.Subjects[0] + memoryPrefix = memoryPrefix[:len(memoryPrefix)-2] + + disk := &NatsStore{ + Conn: conn, + StreamConfig: &DiskBasedStreamConfig, + SubjectPrefix: diskPrefix, + } + + memory := &NatsStore{ + Conn: conn, + StreamConfig: &MemoryBasedStreamConfig, + SubjectPrefix: memoryPrefix, + } + + return &CachingStore{ + Disk: disk, + Memory: memory, + } +} + func newCdcStore(t test.TestingT, conn *nats.Conn, js nats.JetStreamContext) *CdcStore { if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil { t.Fatal(err) @@ -26,8 +114,8 @@ func newCdcStore(t test.TestingT, conn *nats.Conn, js nats.JetStreamContext) *Cd } return &CdcStore{ - Meta: NewMetaStore(conn), - Chunks: NewChunkStore(conn), + Meta: newMetaStore(conn), + Chunks: newChunkStore(conn), } } diff --git a/pkg/store/types.go b/pkg/store/types.go index cd39c1a..73ce012 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -2,12 +2,12 @@ package store import ( "context" + "encoding/base64" "io" "github.com/nats-io/nats.go" "github.com/juju/errors" - "github.com/nix-community/go-nix/pkg/nixbase32" ) const ( @@ -17,7 +17,7 @@ const ( type Digest [32]byte func (d Digest) String() string { - return nixbase32.EncodeToString(d[:]) + return base64.StdEncoding.EncodeToString(d[:]) } type Store interface {