diff --git a/go/store/blobstore/git_blobstore.go b/go/store/blobstore/git_blobstore.go index e688635863d..c005dd00d69 100644 --- a/go/store/blobstore/git_blobstore.go +++ b/go/store/blobstore/git_blobstore.go @@ -174,6 +174,83 @@ func (m *multiPartReadCloser) Close() error { return nil } +type concatReadCloser struct { + ctx context.Context + keys []string + open func(ctx context.Context, key string) (io.ReadCloser, error) + cur int + curRC io.ReadCloser + done bool +} + +func (c *concatReadCloser) ensureCurrent() error { + if c.done || c.curRC != nil { + return nil + } + if c.cur >= len(c.keys) { + c.done = true + return nil + } + rc, err := c.open(c.ctx, c.keys[c.cur]) + if err != nil { + return err + } + c.curRC = rc + return nil +} + +func (c *concatReadCloser) closeCurrentAndAdvance() error { + if c.curRC != nil { + err := c.curRC.Close() + c.curRC = nil + c.cur++ + return err + } + c.cur++ + return nil +} + +func (c *concatReadCloser) Read(p []byte) (int, error) { + for { + if err := c.ensureCurrent(); err != nil { + return 0, err + } + if c.curRC == nil { + return 0, io.EOF + } + + n, err := c.curRC.Read(p) + if n > 0 { + // Preserve data; defer advancement until next Read call. + if err == io.EOF { + _ = c.closeCurrentAndAdvance() + return n, nil + } + return n, err + } + if err == nil { + continue + } + if err == io.EOF { + if cerr := c.closeCurrentAndAdvance(); cerr != nil { + return 0, cerr + } + continue + } + return 0, err + } +} + +func (c *concatReadCloser) Close() error { + c.done = true + if c.curRC != nil { + err := c.curRC.Close() + c.curRC = nil + return err + } + return nil +} + // GitBlobstore is a Blobstore implementation backed by a git repository's object // database (bare repo or .git directory). It stores keys as paths within the tree // of the commit referenced by a git ref (e.g. refs/dolt/data). @@ -801,20 +878,149 @@ func (gbs *GitBlobstore) currentKeyVersion(ctx context.Context, commit git.OID, } func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []string) (string, error) { - // Chunked-object support is landing in phases. Concatenate is the final piece - // needed for NBS conjoin and is intentionally left unimplemented on this branch. - // // Keep key validation for consistent error behavior. - _, err := normalizeGitTreePath(key) + var err error + key, err = normalizeGitTreePath(key) if err != nil { return "", err } + if len(sources) == 0 { + return "", fmt.Errorf("gitblobstore: concatenate requires at least one source") + } + normSources := make([]string, 0, len(sources)) for _, src := range sources { - if _, err := normalizeGitTreePath(src); err != nil { + norm, err := normalizeGitTreePath(src) + if err != nil { return "", err } + normSources = append(normSources, norm) + } + sources = normSources + + // For non-manifest keys, match Put's behavior: if the key already exists, succeed without overwriting. + if ver, ok, err := gbs.tryFastSucceedPutIfKeyExists(ctx, key); err != nil { + return "", err + } else if ok { + return ver, nil + } + + // Resolve a snapshot commit for the sources. + commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + if err != nil { + return "", err + } + if !ok { + // Consistent with Get: empty store => manifest missing, other keys => ref missing. + if key == "manifest" { + return "", NotFound{Key: key} + } + return "", &git.RefNotFoundError{Ref: gbs.ref} + } + + totalSz, err := gbs.totalSizeAtCommit(ctx, commit, sources) + if err != nil { + return "", err + } + + rc := &concatReadCloser{ + ctx: ctx, + keys: sources, + open: func(ctx context.Context, k string) (io.ReadCloser, error) { + return gbs.openReaderAtCommit(ctx, commit, k) + }, + } + defer rc.Close() + + plan, err := gbs.planPutWrites(ctx, key, totalSz, rc) + if err != nil { + return "", err + } + + msg := fmt.Sprintf("gitblobstore: concatenate %s", key) + return gbs.putWithCASRetries(ctx, key, plan, msg) +} + +func (gbs *GitBlobstore) openReaderAtCommit(ctx context.Context, commit git.OID, key string) (io.ReadCloser, error) { + oid, typ, err := gbs.resolveObjectForGet(ctx, commit, key) + if err != nil { + return nil, err + } + switch typ { + case git.ObjectTypeBlob: + return gbs.api.BlobReader(ctx, oid) + case git.ObjectTypeTree: + rc, _, _, err := gbs.openChunkedTreeRange(ctx, commit, key, AllRange) + if err != nil { + // Defensive: resolveObjectForGet succeeded, but keep NotFound mapping consistent. + var pnf *git.PathNotFoundError + if errors.As(err, &pnf) { + return nil, NotFound{Key: key} + } + return nil, err + } + return rc, nil + default: + return nil, fmt.Errorf("gitblobstore: unsupported object type %q for key %q", typ, key) + } +} + +// sizeAtCommit returns the byte size of |key| as of |commit|. +// It supports both inline blobs and the chunked-tree representation used by GitBlobstore. +// If |key| is missing at |commit|, it returns NotFound{Key: key}. +func (gbs *GitBlobstore) sizeAtCommit(ctx context.Context, commit git.OID, key string) (uint64, error) { + oid, typ, err := gbs.api.ResolvePathObject(ctx, commit, key) + if err != nil { + if git.IsPathNotFound(err) { + return 0, NotFound{Key: key} + } + return 0, err + } + + switch typ { + case git.ObjectTypeBlob: + sz, err := gbs.api.BlobSize(ctx, oid) + if err != nil { + return 0, err + } + if sz < 0 { + return 0, fmt.Errorf("gitblobstore: invalid blob size %d for key %q", sz, key) + } + return uint64(sz), nil + + case git.ObjectTypeTree: + entries, err := gbs.api.ListTree(ctx, commit, key) + if err != nil { + if git.IsPathNotFound(err) { + return 0, NotFound{Key: key} + } + return 0, err + } + _, total, err := gbs.validateAndSizeChunkedParts(ctx, entries) + return total, err + + default: + return 0, fmt.Errorf("gitblobstore: unsupported object type %q for key %q", typ, key) + } +} + +// totalSizeAtCommit sums the sizes of |sources| at |commit| and returns the total as int64. +// Returns an error on overflow or if any source is missing. +func (gbs *GitBlobstore) totalSizeAtCommit(ctx context.Context, commit git.OID, sources []string) (int64, error) { + var total uint64 + for _, src := range sources { + sz, err := gbs.sizeAtCommit(ctx, commit, src) + if err != nil { + return 0, err + } + if sz > math.MaxUint64-total { + return 0, fmt.Errorf("gitblobstore: concatenated size overflow") + } + total += sz + } + if total > uint64(math.MaxInt64) { + return 0, fmt.Errorf("gitblobstore: concatenated size %d overflows int64", total) } - return "", git.ErrUnimplemented + return int64(total), nil } func sliceInlineBlob(rc io.ReadCloser, sz int64, br BlobRange, ver string) (io.ReadCloser, uint64, string, error) { diff --git a/go/store/blobstore/git_blobstore_helpers_test.go b/go/store/blobstore/git_blobstore_helpers_test.go index 89be72406e4..8080f4e8382 100644 --- a/go/store/blobstore/git_blobstore_helpers_test.go +++ b/go/store/blobstore/git_blobstore_helpers_test.go @@ -15,6 +15,7 @@ package blobstore import ( + "bytes" "context" "errors" "io" @@ -29,6 +30,7 @@ type fakeGitAPI struct { tryResolveRefCommit func(ctx context.Context, ref string) (git.OID, bool, error) resolvePathBlob func(ctx context.Context, commit git.OID, path string) (git.OID, error) resolvePathObject func(ctx context.Context, commit git.OID, path string) (git.OID, git.ObjectType, error) + listTree func(ctx context.Context, commit git.OID, treePath string) ([]git.TreeEntry, error) blobSize func(ctx context.Context, oid git.OID) (int64, error) blobReader func(ctx context.Context, oid git.OID) (io.ReadCloser, error) } @@ -46,7 +48,7 @@ func (f fakeGitAPI) ResolvePathObject(ctx context.Context, commit git.OID, path return f.resolvePathObject(ctx, commit, path) } func (f fakeGitAPI) ListTree(ctx context.Context, commit git.OID, treePath string) ([]git.TreeEntry, error) { - panic("unexpected call") + return f.listTree(ctx, commit, treePath) } func (f fakeGitAPI) CatFileType(ctx context.Context, oid git.OID) (string, error) { panic("unexpected call") @@ -230,3 +232,181 @@ func TestGitBlobstoreHelpers_validateAndSizeChunkedParts(t *testing.T) { _, _, err = gbs.validateAndSizeChunkedParts(ctx, []git.TreeEntry{{Name: "1", Type: git.ObjectTypeBlob, OID: "0123456789abcdef0123456789abcdef01234567"}}) require.Error(t, err) } + +func TestGitBlobstoreHelpers_sizeAtCommit(t *testing.T) { + ctx := context.Background() + commit := git.OID("0123456789abcdef0123456789abcdef01234567") + + t.Run("blob", func(t *testing.T) { + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + require.Equal(t, commit, gotCommit) + require.Equal(t, "k", path) + return git.OID("89abcdef0123456789abcdef0123456789abcdef"), git.ObjectTypeBlob, nil + }, + blobSize: func(ctx context.Context, gotOID git.OID) (int64, error) { + require.Equal(t, git.OID("89abcdef0123456789abcdef0123456789abcdef"), gotOID) + return 123, nil + }, + } + gbs := &GitBlobstore{api: api} + sz, err := gbs.sizeAtCommit(ctx, commit, "k") + require.NoError(t, err) + require.Equal(t, uint64(123), sz) + }) + + t.Run("chunkedTree", func(t *testing.T) { + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + require.Equal(t, commit, gotCommit) + require.Equal(t, "k", path) + return git.OID("treeoid"), git.ObjectTypeTree, nil + }, + listTree: func(ctx context.Context, gotCommit git.OID, treePath string) ([]git.TreeEntry, error) { + require.Equal(t, commit, gotCommit) + require.Equal(t, "k", treePath) + return []git.TreeEntry{ + {Name: "0001", Type: git.ObjectTypeBlob, OID: "0123456789abcdef0123456789abcdef01234567"}, + {Name: "0002", Type: git.ObjectTypeBlob, OID: "89abcdef0123456789abcdef0123456789abcdef"}, + }, nil + }, + blobSize: func(ctx context.Context, oid git.OID) (int64, error) { + switch oid { + case "0123456789abcdef0123456789abcdef01234567": + return 3, nil + case "89abcdef0123456789abcdef0123456789abcdef": + return 5, nil + default: + return 0, errors.New("unexpected oid") + } + }, + } + gbs := &GitBlobstore{api: api} + sz, err := gbs.sizeAtCommit(ctx, commit, "k") + require.NoError(t, err) + require.Equal(t, uint64(8), sz) + }) + + t.Run("notFound", func(t *testing.T) { + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + return git.OID(""), git.ObjectTypeUnknown, &git.PathNotFoundError{Commit: gotCommit.String(), Path: path} + }, + } + gbs := &GitBlobstore{api: api} + _, err := gbs.sizeAtCommit(ctx, commit, "missing") + var nf NotFound + require.ErrorAs(t, err, &nf) + require.Equal(t, "missing", nf.Key) + }) +} + +func TestGitBlobstoreHelpers_totalSizeAtCommit_overflowInt64(t *testing.T) { + ctx := context.Background() + commit := git.OID("0123456789abcdef0123456789abcdef01234567") + + api := fakeGitAPI{ + resolvePathObject: func(ctx context.Context, gotCommit git.OID, path string) (git.OID, git.ObjectType, error) { + return git.OID(path + "_oid"), git.ObjectTypeBlob, nil + }, + blobSize: func(ctx context.Context, gotOID git.OID) (int64, error) { + // Make the total exceed int64 max with two sources. + if gotOID == "a_oid" { + return int64(^uint64(0) >> 1), nil // math.MaxInt64 without importing math + } + return 1, nil + }, + } + gbs := &GitBlobstore{api: api} + _, err := gbs.totalSizeAtCommit(ctx, commit, []string{"a", "b"}) + require.Error(t, err) +} + +func TestConcatReadCloser(t *testing.T) { + ctx := context.Background() + closed := map[string]int{} + opened := map[string]int{} + + mk := func(s string) io.ReadCloser { + r := bytes.NewReader([]byte(s)) + return &trackedReadCloser{ + r: r, + onClose: func() { + closed[s]++ + }, + } + } + + crc := &concatReadCloser{ + ctx: ctx, + keys: []string{"a", "b"}, + open: func(ctx context.Context, key string) (io.ReadCloser, error) { + opened[key]++ + if key == "a" { + return mk("hi"), nil + } + return mk("there"), nil + }, + } + + out, err := io.ReadAll(crc) + require.NoError(t, err) + require.Equal(t, "hithere", string(out)) + require.NoError(t, crc.Close()) + require.Equal(t, 1, opened["a"]) + require.Equal(t, 1, opened["b"]) + require.Equal(t, 1, closed["hi"]) + require.Equal(t, 1, closed["there"]) +} + +func TestConcatReadCloser_CloseEarlyClosesCurrent(t *testing.T) { + ctx := context.Background() + closed := map[string]int{} + opened := map[string]int{} + + mk := func(id string, s string) io.ReadCloser { + r := bytes.NewReader([]byte(s)) + return &trackedReadCloser{ + r: r, + onClose: func() { + closed[id]++ + }, + } + } + + crc := &concatReadCloser{ + ctx: ctx, + keys: []string{"a", "b"}, + open: func(ctx context.Context, key string) (io.ReadCloser, error) { + opened[key]++ + if key == "a" { + return mk("a", "hello"), nil + } + return mk("b", "world"), nil + }, + } + + buf := make([]byte, 1) + n, err := crc.Read(buf) + require.Equal(t, 1, n) + require.NoError(t, err) + + require.NoError(t, crc.Close()) + require.Equal(t, 1, opened["a"]) + require.Equal(t, 0, opened["b"], "expected not to open second reader when closing early") + require.Equal(t, 1, closed["a"]) + require.Equal(t, 0, closed["b"]) +} + +type trackedReadCloser struct { + r io.Reader + onClose func() +} + +func (t *trackedReadCloser) Read(p []byte) (int, error) { return t.r.Read(p) } +func (t *trackedReadCloser) Close() error { + if t.onClose != nil { + t.onClose() + } + return nil +} diff --git a/go/store/blobstore/git_blobstore_test.go b/go/store/blobstore/git_blobstore_test.go index 1d41173effe..18b7156ec55 100644 --- a/go/store/blobstore/git_blobstore_test.go +++ b/go/store/blobstore/git_blobstore_test.go @@ -255,6 +255,141 @@ func TestGitBlobstore_Put_RoundTripAndVersion(t *testing.T) { require.Equal(t, want, got) } +func TestGitBlobstore_Concatenate_Basic(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + _, err = PutBytes(ctx, bs, "a", []byte("hi ")) + require.NoError(t, err) + _, err = PutBytes(ctx, bs, "b", []byte("there")) + require.NoError(t, err) + + ver, err := bs.Concatenate(ctx, "c", []string{"a", "b"}) + require.NoError(t, err) + require.NotEmpty(t, ver) + + got, ver2, err := GetBytes(ctx, bs, "c", AllRange) + require.NoError(t, err) + require.Equal(t, ver, ver2) + require.Equal(t, []byte("hi there"), got) +} + +func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + Identity: testIdentity(), + MaxPartSize: 1024, + }) + require.NoError(t, err) + + a := bytes.Repeat([]byte("a"), 700) + b := bytes.Repeat([]byte("b"), 700) + want := append(append([]byte(nil), a...), b...) + + _, err = PutBytes(ctx, bs, "a", a) + require.NoError(t, err) + _, err = PutBytes(ctx, bs, "b", b) + require.NoError(t, err) + + ver, err := bs.Concatenate(ctx, "c", []string{"a", "b"}) + require.NoError(t, err) + require.NotEmpty(t, ver) + + // Verify the resulting key is stored as a chunked tree (not a single blob). + head, ok, err := bs.api.TryResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + require.True(t, ok) + oid, typ, err := bs.api.ResolvePathObject(ctx, head, "c") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeTree, typ) + require.Equal(t, oid.String(), ver) + + parts, err := bs.api.ListTree(ctx, head, "c") + require.NoError(t, err) + require.GreaterOrEqual(t, len(parts), 2) + require.Equal(t, "00000001", parts[0].Name) + + got, ver2, err := GetBytes(ctx, bs, "c", AllRange) + require.NoError(t, err) + require.Equal(t, ver, ver2) + require.Equal(t, want, got) +} + +func TestGitBlobstore_Concatenate_KeyExistsFastSucceeds(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + ver1, err := PutBytes(ctx, bs, "c", []byte("original")) + require.NoError(t, err) + require.NotEmpty(t, ver1) + + _, err = PutBytes(ctx, bs, "a", []byte("new ")) + require.NoError(t, err) + _, err = PutBytes(ctx, bs, "b", []byte("value")) + require.NoError(t, err) + + ver2, err := bs.Concatenate(ctx, "c", []string{"a", "b"}) + require.NoError(t, err) + require.Equal(t, ver1, ver2, "expected concatenate to fast-succeed without overwriting existing key") + + got, ver3, err := GetBytes(ctx, bs, "c", AllRange) + require.NoError(t, err) + require.Equal(t, ver1, ver3) + require.Equal(t, []byte("original"), got) +} + +func TestGitBlobstore_Concatenate_MissingSourceIsNotFound(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + _, err = PutBytes(ctx, bs, "present", []byte("x")) + require.NoError(t, err) + + _, err = bs.Concatenate(ctx, "c", []string{"present", "missing"}) + require.Error(t, err) + require.True(t, IsNotFoundError(err)) + var nf NotFound + require.ErrorAs(t, err, &nf) + require.Equal(t, "missing", nf.Key) +} + +func TestGitBlobstore_Concatenate_EmptySourcesErrors(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + _, err = bs.Concatenate(ctx, "c", nil) + require.Error(t, err) +} + type putShouldNotRead struct{} func (putShouldNotRead) Read(_ []byte) (int, error) {