diff --git a/go/store/blobstore/git_blobstore.go b/go/store/blobstore/git_blobstore.go index 9c09266968a..10063b6fe07 100644 --- a/go/store/blobstore/git_blobstore.go +++ b/go/store/blobstore/git_blobstore.go @@ -57,6 +57,13 @@ type putPlan struct { chunked bool } +// pendingWrite holds a deferred non-manifest write that will be flushed +// in a single commit+push when CheckAndPut("manifest") is called. +type pendingWrite struct { + key string + plan putPlan +} + type limitReadCloser struct { r io.Reader c io.Closer @@ -280,6 +287,12 @@ type GitBlobstore struct { // A zero value means "disabled" (store values inline as a single git blob). maxPartSize uint64 + // pendingWrites accumulates non-manifest writes that will be flushed in a single + // commit+push when CheckAndPut("manifest") is called. This avoids per-key + // fetch/commit/push cycles for content-addressed (immutable) table file blobs. + // Guarded by gbs.mu. + pendingWrites []pendingWrite + // cacheMu guards all cache fields below. cacheMu sync.RWMutex // cacheHead is the last commit OID whose tree we merged into the cache. @@ -662,11 +675,11 @@ func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string func (gbs *GitBlobstore) putWithRemoteSync(ctx context.Context, key string, plan putPlan, msg string) (string, error) { return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, ok bool) (git.OID, error) { - return gbs.buildCommitForKeyWrite(ctx, remoteHead, ok, key, plan, msg) + return gbs.buildCommitForKeyWrite(ctx, remoteHead, ok, key, plan, msg, nil) }) } -func (gbs *GitBlobstore) checkAndPutWithRemoteSync(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader, msg string) (string, error) { +func (gbs *GitBlobstore) checkAndPutWithRemoteSync(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader, msg string, extraWrites []pendingWrite) (string, error) { var cachedPlan *putPlan return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, ok bool) (git.OID, error) { actualKeyVersion, err := gbs.currentKeyVersion(ctx, remoteHead, ok, key) @@ -683,7 +696,7 @@ func (gbs *GitBlobstore) checkAndPutWithRemoteSync(ctx context.Context, expected } cachedPlan = &plan } - return gbs.buildCommitForKeyWrite(ctx, remoteHead, ok, key, *cachedPlan, msg) + return gbs.buildCommitForKeyWrite(ctx, remoteHead, ok, key, *cachedPlan, msg, extraWrites) }) } @@ -692,13 +705,18 @@ func (gbs *GitBlobstore) Exists(ctx context.Context, key string) (bool, error) { if err != nil { return false, err } + + // For non-manifest content-addressed keys, check the cache first. + // If found, skip the remote fetch (covers putDeferred and prior fetches). + if key != gitblobstoreManifestKey { + if _, ok := gbs.cacheGetObject(key); ok { + return true, nil + } + } + if err := gbs.syncForRead(ctx); err != nil { return false, err } - // Note: because the cache is merge-only and treats non-manifest keys as immutable once cached, - // Exists(key) may return true for keys that were seen in a past fetched head but are no longer - // present in the current remote head (e.g. after remote rewrites). This is acceptable for NBS - // tablefiles, which are content-addressed and coordinated by the manifest. _, ok := gbs.cacheGetObject(key) return ok, nil } @@ -708,14 +726,23 @@ func (gbs *GitBlobstore) Get(ctx context.Context, key string, br BlobRange) (io. if err != nil { return nil, 0, "", err } - if err := gbs.syncForRead(ctx); err != nil { - return nil, 0, "", err + + // For non-manifest content-addressed keys, try the cache first (covers + // keys written via putDeferred or already fetched). Only fetch from + // remote if the key isn't cached yet. + if key != gitblobstoreManifestKey { + if _, ok := gbs.cacheGetObject(key); ok { + return gbs.getFromCache(ctx, key, br) + } } - commit, err := gbs.resolveCommitForGet(ctx, key) - if err != nil { + + if err := gbs.syncForRead(ctx); err != nil { return nil, 0, "", err } + return gbs.getFromCache(ctx, key, br) +} +func (gbs *GitBlobstore) getFromCache(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) { obj, ok := gbs.cacheGetObject(key) if !ok { return nil, 0, "", NotFound{Key: key} @@ -723,13 +750,13 @@ func (gbs *GitBlobstore) Get(ctx context.Context, key string, br BlobRange) (io. switch obj.typ { case git.ObjectTypeBlob: - sz, ver, err := gbs.resolveBlobSizeForGet(ctx, commit, obj.oid) + sz, err := gbs.api.BlobSize(ctx, obj.oid) if err != nil { - return nil, 0, ver, err + return nil, 0, "", err } rc, err := gbs.api.BlobReader(ctx, obj.oid) if err != nil { - return nil, 0, ver, err + return nil, 0, "", err } // Per-key version: blob object id. return sliceInlineBlob(rc, sz, br, obj.oid.String()) @@ -820,60 +847,31 @@ func (gbs *GitBlobstore) validateAndSizeChunkedParts(ctx context.Context, entrie return parts, total, nil } -func (gbs *GitBlobstore) resolveCommitForGet(ctx context.Context, key string) (commit git.OID, err error) { - commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) - if err != nil { - return git.OID(""), err - } - if ok { - return commit, nil - } - - // If the ref doesn't exist, treat the manifest as missing (empty store), - // but surface a hard error for other keys: the store itself is missing. - if key == "manifest" { - return git.OID(""), NotFound{Key: key} - } - return git.OID(""), &git.RefNotFoundError{Ref: gbs.localRef} -} - -func (gbs *GitBlobstore) resolveBlobSizeForGet(ctx context.Context, commit git.OID, oid git.OID) (sz int64, ver string, err error) { - sz, err = gbs.api.BlobSize(ctx, oid) - if err != nil { - return 0, commit.String(), err - } - return sz, commit.String(), nil -} - func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (string, error) { key, err := normalizeGitTreePath(key) if err != nil { return "", err } - // Ensure the idempotent "key exists" fast-path observes remote state. - if err := gbs.syncForRead(ctx); err != nil { - return "", err + // For non-manifest keys, skip remote sync entirely: check cache, hash locally, + // and defer the write to be flushed in the next CheckAndPut("manifest"). + if key != gitblobstoreManifestKey { + if obj, ok := gbs.cacheGetObject(key); ok { + return obj.oid.String(), nil + } + plan, err := gbs.planPutWrites(ctx, key, totalSize, reader) + if err != nil { + return "", err + } + return gbs.enqueuePendingWrite(key, plan), nil } - // Many NBS/table-file writes are content-addressed: if the key already exists, callers - // assume it refers to the same bytes and treat the operation as idempotent. - // - // GitBlobstore enforces that assumption by fast-succeeding when a non-manifest key - // already exists: it returns the existing per-key version and does not overwrite the - // key (and does not consume |reader|). - // - // The manifest is the main exception (it is mutable and updated via CheckAndPut). - if ver, ok, err := gbs.tryFastSucceedPutIfKeyExists(ctx, key); err != nil { + // Manifest key: fall through to existing remote-synced path. + if err := gbs.syncForRead(ctx); err != nil { return "", err - } else if ok { - return ver, nil } msg := fmt.Sprintf("gitblobstore: put %s", key) - - // Hash the contents once. If we need to retry due to a concurrent remote advance (lease failure), - // we can reuse the resulting object OIDs without re-reading |reader|. plan, err := gbs.planPutWrites(ctx, key, totalSize, reader) if err != nil { return "", err @@ -881,6 +879,39 @@ func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, r return gbs.putWithRemoteSync(ctx, key, plan, msg) } +// cacheUpdateForPlan updates the in-memory cache for a locally-hashed putPlan +// so that subsequent reads (e.g. Concatenate source resolution) can find the data. +func (gbs *GitBlobstore) cacheUpdateForPlan(key string, plan putPlan) { + gbs.cacheMu.Lock() + defer gbs.cacheMu.Unlock() + + if plan.chunked { + // For chunked writes, cache each part blob and the parent directory's children. + // Use the first part's OID as a placeholder tree OID for version consistency + // between Put and idempotent re-Put. The actual tree OID is computed at commit time. + placeholderOID := git.OID("") + if len(plan.writes) > 0 { + placeholderOID = plan.writes[0].oid + } + gbs.mergeCacheObjectLocked(key, placeholderOID, git.ObjectTypeTree, false) + touchedDirs := make(map[string]struct{}) + for _, w := range plan.writes { + gbs.mergeCacheObjectLocked(w.path, w.oid, git.ObjectTypeBlob, false) + parent, base := splitGitPathParentBase(w.path) + if base != "" { + child := git.TreeEntry{Mode: "100644", Type: git.ObjectTypeBlob, OID: w.oid, Name: base} + if gbs.mergeCacheChildLocked(parent, child, false) { + touchedDirs[parent] = struct{}{} + } + } + } + gbs.sortCacheChildrenLocked(touchedDirs) + } else if len(plan.writes) == 1 { + w := plan.writes[0] + gbs.mergeCacheObjectLocked(w.path, w.oid, git.ObjectTypeBlob, false) + } +} + func (gbs *GitBlobstore) planPutWrites(ctx context.Context, key string, totalSize int64, reader io.Reader) (putPlan, error) { // Minimal policy: chunk only when explicitly enabled and |totalSize| exceeds MaxPartSize. if gbs.maxPartSize == 0 || totalSize <= 0 || uint64(totalSize) <= gbs.maxPartSize { @@ -891,10 +922,13 @@ func (gbs *GitBlobstore) planPutWrites(ctx context.Context, key string, totalSiz return putPlan{writes: []treeWrite{{path: key, oid: blobOID}}}, nil } - partOIDs, err := gbs.hashChunkedParts(ctx, reader) + _, partOIDs, _, err := gbs.hashParts(ctx, reader) if err != nil { return putPlan{}, err } + if len(partOIDs) == 0 { + return putPlan{}, fmt.Errorf("gitblobstore: chunked write for key %q produced no parts", key) + } writes := make([]treeWrite, 0, len(partOIDs)) for i, p := range partOIDs { @@ -904,19 +938,6 @@ func (gbs *GitBlobstore) planPutWrites(ctx context.Context, key string, totalSiz return putPlan{writes: writes, chunked: true}, nil } -func (gbs *GitBlobstore) hashChunkedParts(ctx context.Context, reader io.Reader) (partOIDs []git.OID, err error) { - max := int64(gbs.maxPartSize) - if max <= 0 { - return nil, fmt.Errorf("gitblobstore: invalid maxPartSize %d", gbs.maxPartSize) - } - - _, partOIDs, _, err = gbs.hashParts(ctx, reader) - if err != nil { - return nil, err - } - return partOIDs, nil -} - func (gbs *GitBlobstore) hashParts(ctx context.Context, reader io.Reader) (parts []chunkPartRef, partOIDs []git.OID, total uint64, err error) { max := int64(gbs.maxPartSize) if max <= 0 { @@ -964,7 +985,7 @@ func (gbs *GitBlobstore) casRetryPolicy(ctx context.Context) backoff.BackOff { return backoff.WithContext(backoff.WithMaxRetries(bo, maxRetries), ctx) } -func (gbs *GitBlobstore) buildCommitForKeyWrite(ctx context.Context, parent git.OID, hasParent bool, key string, plan putPlan, msg string) (git.OID, error) { +func (gbs *GitBlobstore) buildCommitForKeyWrite(ctx context.Context, parent git.OID, hasParent bool, key string, plan putPlan, msg string, extraWrites []pendingWrite) (git.OID, error) { _, indexFile, cleanup, err := git.NewTempIndex() if err != nil { return "", err @@ -981,6 +1002,20 @@ func (gbs *GitBlobstore) buildCommitForKeyWrite(ctx context.Context, parent git. } } + // Apply extra pending writes first (used when flushing deferred writes with manifest). + for _, pw := range extraWrites { + if hasParent { + if err := gbs.removeKeyConflictsFromIndex(ctx, parent, indexFile, pw.key, pw.plan.chunked); err != nil { + return "", err + } + } + for _, w := range pw.plan.writes { + if err := gbs.api.UpdateIndexCacheInfo(ctx, indexFile, "100644", w.oid, w.path); err != nil { + return "", err + } + } + } + if hasParent { if err := gbs.removeKeyConflictsFromIndex(ctx, parent, indexFile, key, plan.chunked); err != nil { return "", err @@ -1052,26 +1087,6 @@ func (gbs *GitBlobstore) removeKeyConflictsFromIndex(ctx context.Context, parent } } -func (gbs *GitBlobstore) tryFastSucceedPutIfKeyExists(ctx context.Context, key string) (ver string, ok bool, err error) { - if key == "manifest" { - return "", false, nil - } - - _, haveCommit, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) - if err != nil { - return "", false, err - } - if !haveCommit { - return "", false, nil - } - obj, ok := gbs.cacheGetObject(key) - if !ok { - return "", false, nil - } - // Per-key version: existing object id. - return obj.oid.String(), true, nil -} - func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader) (string, error) { key, err := normalizeGitTreePath(key) if err != nil { @@ -1079,7 +1094,23 @@ func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key s } msg := fmt.Sprintf("gitblobstore: checkandput %s", key) - return gbs.checkAndPutWithRemoteSync(ctx, expectedVersion, key, totalSize, reader, msg) + + // For the manifest key, flush all pending writes in a single commit+push. + if key == gitblobstoreManifestKey { + gbs.mu.Lock() + pending := gbs.pendingWrites + gbs.pendingWrites = nil + gbs.mu.Unlock() + ver, err := gbs.checkAndPutWithRemoteSync(ctx, expectedVersion, key, totalSize, reader, msg, pending) + if err != nil && len(pending) > 0 { + gbs.mu.Lock() + gbs.pendingWrites = append(pending, gbs.pendingWrites...) + gbs.mu.Unlock() + } + return ver, err + } + + return gbs.checkAndPutWithRemoteSync(ctx, expectedVersion, key, totalSize, reader, msg, nil) } func (gbs *GitBlobstore) currentKeyVersion(ctx context.Context, commit git.OID, haveCommit bool, key string) (string, error) { @@ -1101,9 +1132,6 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources [] if err != nil { return "", err } - if err := gbs.syncForRead(ctx); err != nil { - return "", err - } if len(sources) == 0 { return "", fmt.Errorf("gitblobstore: concatenate requires at least one source") } @@ -1117,31 +1145,56 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources [] } sources = normSources - // For non-manifest keys, match Put's behavior: if the key already exists, succeed without overwriting. - if ver, ok, err := gbs.tryFastSucceedPutIfKeyExists(ctx, key); err != nil { + // For non-manifest keys, skip remote sync and defer the write. + if key != gitblobstoreManifestKey { + return gbs.concatenateDeferred(ctx, key, sources) + } + + // Manifest key: fall through to existing remote-synced path. + if err := gbs.syncForRead(ctx); err != nil { return "", err - } else if ok { - return ver, nil } - // Resolve a snapshot commit for the sources. commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) if err != nil { return "", err } if !ok { - // Consistent with Get: empty store => manifest missing, other keys => ref missing. - if key == "manifest" { - return "", NotFound{Key: key} - } - return "", &git.RefNotFoundError{Ref: gbs.localRef} + return "", NotFound{Key: key} } - totalSz, err := gbs.totalSizeAtCommit(ctx, commit, sources) + plan, err := gbs.planConcatenation(ctx, key, sources, commit) + if err != nil { + return "", err + } + + msg := fmt.Sprintf("gitblobstore: concatenate %s", key) + return gbs.putWithRemoteSync(ctx, key, plan, msg) +} + +// concatenateDeferred handles Concatenate for non-manifest keys without remote sync. +// Sources are read from the local cache/git objects (populated by prior Puts). +func (gbs *GitBlobstore) concatenateDeferred(ctx context.Context, key string, sources []string) (string, error) { + // Fast-succeed if key already in cache. + if obj, ok := gbs.cacheGetObject(key); ok { + return obj.oid.String(), nil + } + + plan, err := gbs.planConcatenation(ctx, key, sources, "") if err != nil { return "", err } + return gbs.enqueuePendingWrite(key, plan), nil +} + +// planConcatenation computes the total size of sources, opens a concatenated +// reader, and returns a putPlan for the result key. +func (gbs *GitBlobstore) planConcatenation(ctx context.Context, key string, sources []string, commit git.OID) (putPlan, error) { + totalSz, err := gbs.totalSizeAtCommit(ctx, commit, sources) + if err != nil { + return putPlan{}, err + } rc := &concatReadCloser{ ctx: ctx, keys: sources, @@ -1150,14 +1203,17 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources [] }, } defer rc.Close() + return gbs.planPutWrites(ctx, key, totalSz, rc) +} - plan, err := gbs.planPutWrites(ctx, key, totalSz, rc) - if err != nil { - return "", err - } - - msg := fmt.Sprintf("gitblobstore: concatenate %s", key) - return gbs.putWithRemoteSync(ctx, key, plan, msg) +// enqueuePendingWrite appends a deferred write to pendingWrites, updates the +// cache optimistically, and returns a synthetic version string. +func (gbs *GitBlobstore) enqueuePendingWrite(key string, plan putPlan) string { + gbs.mu.Lock() + gbs.pendingWrites = append(gbs.pendingWrites, pendingWrite{key: key, plan: plan}) + gbs.mu.Unlock() + gbs.cacheUpdateForPlan(key, plan) + return plan.writes[0].oid.String() } func (gbs *GitBlobstore) openReaderAtCommit(ctx context.Context, commit git.OID, key string) (io.ReadCloser, error) { diff --git a/go/store/blobstore/git_blobstore_chunked_put_test.go b/go/store/blobstore/git_blobstore_chunked_put_test.go index 6c43adc7853..765f6d62233 100644 --- a/go/store/blobstore/git_blobstore_chunked_put_test.go +++ b/go/store/blobstore/git_blobstore_chunked_put_test.go @@ -50,12 +50,17 @@ func TestGitBlobstore_Put_ChunkedWritesTreeParts(t *testing.T) { want := []byte("abcdefghij") // 10 bytes -> 3,3,3,1 ver, err := bs.Put(ctx, "big", int64(len(want)), bytes.NewReader(want)) require.NoError(t, err) + require.NotEmpty(t, ver) - got, ver2, err := GetBytes(ctx, bs, "big", AllRange) + // Non-manifest Put is deferred; data should be readable from cache/local git objects. + got, _, err := GetBytes(ctx, bs, "big", AllRange) require.NoError(t, err) - require.Equal(t, ver, ver2) require.Equal(t, want, got) + // Flush deferred writes via CheckAndPut("manifest"). + _, err = bs.CheckAndPut(ctx, "", "manifest", 3, bytes.NewReader([]byte("xxx\n"))) + require.NoError(t, err) + runner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) api := git.NewGitAPIImpl(runner) @@ -101,20 +106,14 @@ func TestGitBlobstore_Put_IdempotentDoesNotChangeExistingRepresentation(t *testi require.NoError(t, err) api := git.NewGitAPIImpl(runner) - // blob stays blob (even if the caller would have triggered chunked mode) + // blob stays blob (even if the caller would have triggered chunked mode). + // Non-manifest Put is deferred; idempotent re-Put should return same version from cache. verBlob, err := bs.Put(ctx, "k", 2, bytes.NewReader([]byte("hi"))) require.NoError(t, err) verNoop, err := bs.Put(ctx, "k", 10, putShouldNotRead{}) require.NoError(t, err) require.Equal(t, verBlob, verNoop) - head1, ok, err := api.TryResolveRefCommit(ctx, DoltDataRef) - require.NoError(t, err) - require.True(t, ok) - _, typ, err := api.ResolvePathObject(ctx, head1, "k") - require.NoError(t, err) - require.Equal(t, git.ObjectTypeBlob, typ) - got, _, err := GetBytes(ctx, bs, "k", AllRange) require.NoError(t, err) require.Equal(t, []byte("hi"), got) @@ -122,13 +121,6 @@ func TestGitBlobstore_Put_IdempotentDoesNotChangeExistingRepresentation(t *testi // tree stays tree verTree, err := bs.Put(ctx, "ktree", 10, bytes.NewReader([]byte("abcdefghij"))) require.NoError(t, err) - head2, ok, err := api.TryResolveRefCommit(ctx, DoltDataRef) - require.NoError(t, err) - require.True(t, ok) - _, typ, err = api.ResolvePathObject(ctx, head2, "ktree") - require.NoError(t, err) - require.Equal(t, git.ObjectTypeTree, typ) - verTreeNoop, err := bs.Put(ctx, "ktree", 2, putShouldNotRead{}) require.NoError(t, err) require.Equal(t, verTree, verTreeNoop) @@ -136,4 +128,20 @@ func TestGitBlobstore_Put_IdempotentDoesNotChangeExistingRepresentation(t *testi got, _, err = GetBytes(ctx, bs, "ktree", AllRange) require.NoError(t, err) require.Equal(t, []byte("abcdefghij"), got) + + // Flush deferred writes and verify remote state. + _, err = bs.CheckAndPut(ctx, "", "manifest", 3, bytes.NewReader([]byte("xxx\n"))) + require.NoError(t, err) + + head, ok, err := api.TryResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + require.True(t, ok) + + _, typ, err := api.ResolvePathObject(ctx, head, "k") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeBlob, typ) + + _, typ, err = api.ResolvePathObject(ctx, head, "ktree") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeTree, typ) } diff --git a/go/store/blobstore/git_blobstore_helpers_test.go b/go/store/blobstore/git_blobstore_helpers_test.go index 85e480ed7d8..6dd49fea38a 100644 --- a/go/store/blobstore/git_blobstore_helpers_test.go +++ b/go/store/blobstore/git_blobstore_helpers_test.go @@ -108,86 +108,6 @@ func (f fakeGitAPI) PushRefWithLease(ctx context.Context, remote string, srcRef return f.pushRefWithLease(ctx, remote, srcRef, dstRef, expectedDstOID) } -func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) { - ctx := context.Background() - - t.Run("ok", func(t *testing.T) { - api := fakeGitAPI{ - tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) { - require.Equal(t, DoltDataRef, ref) - return git.OID("0123456789abcdef0123456789abcdef01234567"), true, nil - }, - } - gbs := &GitBlobstore{localRef: DoltDataRef, api: api} - - commit, err := gbs.resolveCommitForGet(ctx, "k") - require.NoError(t, err) - require.Equal(t, git.OID("0123456789abcdef0123456789abcdef01234567"), commit) - }) - - t.Run("missingRef_manifestIsNotFound", func(t *testing.T) { - api := fakeGitAPI{ - tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) { - return git.OID(""), false, nil - }, - } - gbs := &GitBlobstore{localRef: DoltDataRef, api: api} - - _, err := gbs.resolveCommitForGet(ctx, "manifest") - var nf NotFound - require.ErrorAs(t, err, &nf) - require.Equal(t, "manifest", nf.Key) - }) - - t.Run("missingRef_nonManifestIsRefNotFound", func(t *testing.T) { - api := fakeGitAPI{ - tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) { - return git.OID(""), false, nil - }, - } - gbs := &GitBlobstore{localRef: DoltDataRef, api: api} - - _, err := gbs.resolveCommitForGet(ctx, "somekey") - var rnf *git.RefNotFoundError - require.ErrorAs(t, err, &rnf) - require.Equal(t, DoltDataRef, rnf.Ref) - }) - - t.Run("propagatesError", func(t *testing.T) { - sentinel := errors.New("boom") - api := fakeGitAPI{ - tryResolveRefCommit: func(ctx context.Context, ref string) (git.OID, bool, error) { - return git.OID(""), false, sentinel - }, - } - gbs := &GitBlobstore{localRef: DoltDataRef, api: api} - - _, err := gbs.resolveCommitForGet(ctx, "k") - require.ErrorIs(t, err, sentinel) - }) -} - -func TestGitBlobstoreHelpers_resolveBlobSizeForGet(t *testing.T) { - ctx := context.Background() - commit := git.OID("0123456789abcdef0123456789abcdef01234567") - oid := git.OID("89abcdef0123456789abcdef0123456789abcdef") - - t.Run("ok", func(t *testing.T) { - api := fakeGitAPI{ - blobSize: func(ctx context.Context, gotOID git.OID) (int64, error) { - require.Equal(t, oid, gotOID) - return 123, nil - }, - } - gbs := &GitBlobstore{api: api} - - sz, ver, err := gbs.resolveBlobSizeForGet(ctx, commit, oid) - require.NoError(t, err) - require.Equal(t, commit.String(), ver) - require.Equal(t, int64(123), sz) - }) -} - func TestGitBlobstoreHelpers_validateAndSizeChunkedParts(t *testing.T) { ctx := context.Background() diff --git a/go/store/blobstore/git_blobstore_test.go b/go/store/blobstore/git_blobstore_test.go index 65e515a323b..80a0359552e 100644 --- a/go/store/blobstore/git_blobstore_test.go +++ b/go/store/blobstore/git_blobstore_test.go @@ -311,6 +311,10 @@ func TestGitBlobstore_RemoteManaged_PutPushesToRemote(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, ver) + // Non-manifest Put is deferred; flush via CheckAndPut("manifest"). + _, err = bs.CheckAndPut(ctx, "", "manifest", 3, bytes.NewReader([]byte("xxx\n"))) + require.NoError(t, err) + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) remoteAPI := git.NewGitAPIImpl(remoteRunner) @@ -344,6 +348,10 @@ func TestGitBlobstore_RemoteManaged_PutBootstrapsEmptyRemote(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, ver) + // Non-manifest Put is deferred; flush via CheckAndPut("manifest"). + _, err = bs.CheckAndPut(ctx, "", "manifest", 3, bytes.NewReader([]byte("xxx\n"))) + require.NoError(t, err) + // Remote should now have refs/dolt/data and contain the key. remoteRunner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) @@ -420,10 +428,14 @@ func TestGitBlobstore_RemoteManaged_PutRetriesOnLeaseFailure(t *testing.T) { }, } + // Put is deferred (no push). The retry happens during CheckAndPut flush. ver, err := PutBytes(ctx, bs, "k", []byte("after retry\n")) require.NoError(t, err) require.NotEmpty(t, ver) + _, err = bs.CheckAndPut(ctx, "", "manifest", 3, bytes.NewReader([]byte("xxx\n"))) + require.NoError(t, err) + remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) require.NoError(t, err) @@ -609,6 +621,10 @@ func TestGitBlobstore_RemoteManaged_PutOverwritesDivergedLocalRef_NoMergeCommit( _, err = PutBytes(ctx, bs, "k", []byte("from local\n")) require.NoError(t, err) + // Non-manifest Put is deferred; flush via CheckAndPut("manifest"). + _, err = bs.CheckAndPut(ctx, "", "manifest", 3, bytes.NewReader([]byte("xxx\n"))) + require.NoError(t, err) + remoteHeadAfter, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) require.NoError(t, err) @@ -813,23 +829,28 @@ func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, ver) - // Verify the resulting key is stored as a chunked tree (not a single blob). - head, ok, err := bs.api.TryResolveRefCommit(ctx, bs.localRef) + // Non-manifest writes are deferred; flush via CheckAndPut("manifest"). + _, err = bs.CheckAndPut(ctx, "", "manifest", 3, bytes.NewReader([]byte("xxx\n"))) + require.NoError(t, err) + + // Verify the resulting key is stored as a chunked tree on the remote. + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) + require.NoError(t, err) + remoteAPI := git.NewGitAPIImpl(remoteRunner) + head, ok, err := remoteAPI.TryResolveRefCommit(ctx, DoltDataRef) require.NoError(t, err) require.True(t, ok) - oid, typ, err := bs.api.ResolvePathObject(ctx, head, "c") + _, typ, err := remoteAPI.ResolvePathObject(ctx, head, "c") require.NoError(t, err) require.Equal(t, git.ObjectTypeTree, typ) - require.Equal(t, oid.String(), ver) - parts, err := bs.api.ListTree(ctx, head, "c") + parts, err := remoteAPI.ListTree(ctx, head, "c") require.NoError(t, err) require.GreaterOrEqual(t, len(parts), 2) require.Equal(t, "0001", parts[0].Name) - got, ver2, err := GetBytes(ctx, bs, "c", AllRange) + got, _, err := GetBytes(ctx, bs, "c", AllRange) require.NoError(t, err) - require.Equal(t, ver, ver2) require.Equal(t, want, got) } diff --git a/go/store/blobstore/internal/git/impl.go b/go/store/blobstore/internal/git/impl.go index e3641a5651c..32e2c32d1ee 100644 --- a/go/store/blobstore/internal/git/impl.go +++ b/go/store/blobstore/internal/git/impl.go @@ -320,7 +320,11 @@ func (a *GitAPIImpl) FetchRef(ctx context.Context, remote string, srcRef string, // Forced refspec to keep tracking refs in sync with remote truth. srcRef = strings.TrimPrefix(srcRef, "+") refspec := "+" + srcRef + ":" + dstRef - _, err := a.r.Run(ctx, RunOptions{}, "fetch", "--no-tags", remote, refspec) + // --refmap="" prevents git from also applying the remote's configured + // fetch refspecs. Without this, stale tracking refs from default refspecs + // can cause directory/file conflicts that make git exit 1 even when our + // specific refspec succeeds. + _, err := a.r.Run(ctx, RunOptions{}, "fetch", "--no-tags", "--refmap=", remote, refspec) if err != nil && isRemoteRefNotFoundErr(err) { return &RefNotFoundError{Ref: srcRef} } diff --git a/go/store/nbs/git_blobstore_empty_remote_test.go b/go/store/nbs/git_blobstore_empty_remote_test.go index 09dc2fa9b0a..89340c1efe7 100644 --- a/go/store/nbs/git_blobstore_empty_remote_test.go +++ b/go/store/nbs/git_blobstore_empty_remote_test.go @@ -147,6 +147,10 @@ func TestNBS_NewGitStore_DefaultsMaxPartSizeTo50MB(t *testing.T) { _, err = gbs.Put(ctx, "k", int64(50*1024*1024+1), bytes.NewReader([]byte("hi"))) require.NoError(t, err) + // Non-manifest Put is deferred; flush via CheckAndPut("manifest"). + _, err = gbs.CheckAndPut(ctx, "", "manifest", 3, bytes.NewReader([]byte("xxx\n"))) + require.NoError(t, err) + // On the remote, key "k" should be represented as a tree containing part "0001". cmd = exec.CommandContext(ctx, "git", "--git-dir", remoteRepo.GitDir, "rev-parse", "--verify", "--quiet", blobstore.DoltDataRef+"^{commit}") headBytes, err := cmd.CombinedOutput()