diff --git a/go/libraries/doltcore/sqle/database_provider.go b/go/libraries/doltcore/sqle/database_provider.go index 428c6a80ee6..94f1ac0d2f2 100644 --- a/go/libraries/doltcore/sqle/database_provider.go +++ b/go/libraries/doltcore/sqle/database_provider.go @@ -71,6 +71,11 @@ type DoltDatabaseProvider struct { droppedDatabaseManager *droppedDatabaseManager overrides sql.EngineOverrides + // remoteDbs caches remote DoltDB instances by URL so that repeated push calls + // to the same remote reuse the store (and its already-opened table chunk sources) + // instead of re-opening every table file from the blobstore each time. + remoteDbs map[string]*doltdb.DoltDB + defaultBranch string dbFactoryUrl string DropDatabaseHooks []DropDatabaseHook @@ -181,6 +186,7 @@ func NewDoltDatabaseProviderWithDatabases(defaultBranch string, fs filesys.Files isStandby: new(bool), droppedDatabaseManager: newDroppedDatabaseManager(fs), overrides: overrides, + remoteDbs: make(map[string]*doltdb.DoltDB), }, nil } @@ -294,6 +300,20 @@ func (p *DoltDatabaseProvider) Close() { } } } + + // Close cached remote databases. + var remoteDbs []*doltdb.DoltDB + func() { + p.mu.RLock() + defer p.mu.RUnlock() + remoteDbs = make([]*doltdb.DoltDB, 0, len(p.remoteDbs)) + for _, rdb := range p.remoteDbs { + remoteDbs = append(remoteDbs, rdb) + } + }() + for _, rdb := range remoteDbs { + _ = rdb.Close() + } } // Installs an InitDatabaseHook which configures new databases--those @@ -539,7 +559,43 @@ func (p *DoltDatabaseProvider) GetRemoteDB(ctx context.Context, format *types.No } if withCaching { - return r.GetRemoteDB(ctx, format, dialer) + // Only cache git-backed remote DBs. Other remote types (file://, aws, etc.) + // register their underlying NBS in a global singleton cache that is closed + // separately by CloseAllLocalDatabases(). Caching those here would cause a + // double-close panic on process exit. + isGitRemote := strings.HasPrefix(strings.ToLower(r.Url), "git+") + if isGitRemote { + cached := func() *doltdb.DoltDB { + p.mu.RLock() + defer p.mu.RUnlock() + return p.remoteDbs[r.Url] + }() + if cached != nil { + return cached, nil + } + } + + remoteDB, err := r.GetRemoteDB(ctx, format, dialer) + if err != nil { + return nil, err + } + + if isGitRemote { + cached := func() *doltdb.DoltDB { + p.mu.Lock() + defer p.mu.Unlock() + if existing, ok := p.remoteDbs[r.Url]; ok { + return existing + } + p.remoteDbs[r.Url] = remoteDB + return nil + }() + if cached != nil { + _ = remoteDB.Close() + return cached, nil + } + } + return remoteDB, nil } return r.GetRemoteDBWithoutCaching(ctx, format, dialer) } diff --git a/go/store/blobstore/git_blobstore.go b/go/store/blobstore/git_blobstore.go index 5a3cf83285b..959d9fcc187 100644 --- a/go/store/blobstore/git_blobstore.go +++ b/go/store/blobstore/git_blobstore.go @@ -21,6 +21,8 @@ import ( "fmt" "io" "math" + "os" + "path/filepath" "sort" "strconv" "strings" @@ -28,6 +30,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" + "github.com/dolthub/fslock" "github.com/google/uuid" git "github.com/dolthub/dolt/go/store/blobstore/internal/git" @@ -275,7 +278,12 @@ type GitBlobstore struct { api git.GitAPI remoteName string remoteTrackingRef string - mu sync.Mutex + // writeMu serializes operations that mutate shared git refs and/or perform + // remote-managed write workflows (commit + push + cache update). + writeMu sync.Mutex + // pendingMu guards pendingWrites, which may be appended to by non-manifest Put/Concatenate + // while being flushed by CheckAndPut("manifest"). + pendingMu sync.Mutex // identity, when non-nil, is used as the author/committer identity for new commits. // When nil, we prefer whatever identity git derives from env/config, falling back // to a deterministic default only if git reports the identity is missing. @@ -290,7 +298,7 @@ type GitBlobstore struct { // pendingWrites accumulates non-manifest writes that will be flushed in a single // commit+push when CheckAndPut("manifest") is called. This avoids per-key // fetch/commit/push cycles for content-addressed (immutable) table file blobs. - // Guarded by gbs.mu. + // Guarded by gbs.pendingMu. pendingWrites []pendingWrite // cacheMu guards all cache fields below. @@ -303,6 +311,13 @@ type GitBlobstore struct { // cacheChildren maps a directory path to its immediate children entries. The // entries' Name field is the base name (not a full path). cacheChildren map[string][]git.TreeEntry + // lastSyncedAt is set after a successful syncForRead or fetchAlignAndMergeForWrite + // (via mergeCacheFromHead). Used to skip redundant back-to-back fetches. + lastSyncedAt time.Time + // syncForReadTTL controls how long a recent sync remains valid. When non-zero, + // syncForRead will skip the fetch if the last sync completed within this duration. + // Defaults to defaultSyncForReadTTL. Set to 0 to disable dedup (useful in tests). + syncForReadTTL time.Duration } var _ Blobstore = (*GitBlobstore)(nil) @@ -356,6 +371,12 @@ type GitBlobstoreOptions struct { } // NewGitBlobstoreWithOptions creates a GitBlobstore rooted at |gitDir| and |ref|. +// defaultSyncForReadTTL is the default dedup window for syncForRead. Back-to-back +// reads within this window reuse the cached state instead of re-fetching from remote. +// The write path (fetchAlignAndMergeForWrite) always does its own unconditional fetch, +// so this only affects read-path callers (Get/Exists). +const defaultSyncForReadTTL = 1 * time.Second + func NewGitBlobstoreWithOptions(gitDir, ref string, opts GitBlobstoreOptions) (*GitBlobstore, error) { r, err := git.NewRunner(gitDir) if err != nil { @@ -383,6 +404,7 @@ func NewGitBlobstoreWithOptions(gitDir, ref string, opts GitBlobstoreOptions) (* maxPartSize: opts.MaxPartSize, cacheObjects: make(map[string]cachedGitObject), cacheChildren: make(map[string][]git.TreeEntry), + syncForReadTTL: defaultSyncForReadTTL, }, nil } @@ -490,6 +512,7 @@ func (gbs *GitBlobstore) mergeCacheFromHead(ctx context.Context, head git.OID) e gbs.sortCacheChildrenLocked(touchedDirs) gbs.cacheHead = head + gbs.lastSyncedAt = time.Now() gbs.cacheMu.Unlock() return nil } @@ -510,8 +533,8 @@ func (gbs *GitBlobstore) validateRemoteManaged() error { // This is an optional hygiene API: by default, UUID-owned local refs may accumulate in the // repo over time. Callers that care about cleanup (e.g. tests) may invoke this explicitly. func (gbs *GitBlobstore) CleanupOwnedLocalRef(ctx context.Context) error { - gbs.mu.Lock() - defer gbs.mu.Unlock() + gbs.writeMu.Lock() + defer gbs.writeMu.Unlock() _, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) if err != nil { @@ -524,12 +547,18 @@ func (gbs *GitBlobstore) CleanupOwnedLocalRef(ctx context.Context) error { return err } -// Close best-effort deletes this instance's UUID-owned refs. +// Close best-effort deletes this instance's UUID-owned refs and +// periodically runs git gc to repack the cache repository. func (gbs *GitBlobstore) Close() error { ctx := context.Background() - gbs.mu.Lock() - defer gbs.mu.Unlock() + // Best-effort periodic GC to repack the cache repo. Runs outside the + // write lock so a slow gc cannot serialize other writers. maybeRunGC + // has its own file-based lock for cross-process coordination. + gbs.maybeRunGC() + + gbs.writeMu.Lock() + defer gbs.writeMu.Unlock() deleteIfExists := func(ref string) error { if ref == "" { @@ -552,15 +581,66 @@ func (gbs *GitBlobstore) Close() error { ) } +const gcInterval = 24 * time.Hour + +// maxParentedCommits is the number of consecutive parented commits before +// we create a parentless commit to sever the history chain. This bounds +// reachable history so git gc can prune old objects, while still giving +// git push enough parents for efficient incremental delta computation. +const maxParentedCommits = 64 + +// maybeRunGC runs git gc --prune=now if >24h have elapsed since the last GC. +// Uses a file lock so only one process GCs at a time, and a marker file whose +// mtime records when GC last completed. All errors are silently ignored — GC +// is an optimization, not a correctness requirement. +func (gbs *GitBlobstore) maybeRunGC() { + markerPath := filepath.Join(gbs.gitDir, ".dolt-gc-last") + if info, err := os.Stat(markerPath); err == nil { + if time.Since(info.ModTime()) < gcInterval { + return + } + } + + lockPath := filepath.Join(gbs.gitDir, ".dolt-gc.lock") + lck := fslock.New(lockPath) + if err := lck.LockWithTimeout(0); err != nil { + return // another process holds the lock, skip + } + defer lck.Unlock() + + // Re-check after acquiring lock — another instance may have GC'd. + if info, err := os.Stat(markerPath); err == nil { + if time.Since(info.ModTime()) < gcInterval { + return + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + _, _ = gbs.runner.Run(ctx, git.RunOptions{}, "gc", "--prune=now") + _ = os.WriteFile(markerPath, nil, 0644) +} + func (gbs *GitBlobstore) syncForRead(ctx context.Context) error { if err := gbs.validateRemoteManaged(); err != nil { return err } - gbs.mu.Lock() - defer gbs.mu.Unlock() - // 1) Fetch remote ref into our remote-tracking ref. - if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil { + // Dedup guard: skip the fetch if we synced recently. The write path + // (fetchAlignAndMergeForWrite) always does its own unconditional fetch, + // so this only affects read-path callers (Get/Exists). + if ttl := gbs.syncForReadTTL; ttl > 0 { + gbs.cacheMu.RLock() + sinceLast := time.Since(gbs.lastSyncedAt) + gbs.cacheMu.RUnlock() + if sinceLast < ttl { + return nil + } + } + + // Fetch remote ref into our remote-tracking ref. + err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef) + if err != nil { // An absent remote ref is treated as an empty store. This is required for NBS open // (manifest ParseIfExists) against a freshly-initialized remote. var rnf *git.RefNotFoundError @@ -578,12 +658,7 @@ func (gbs *GitBlobstore) syncForRead(ctx context.Context) error { return &git.RefNotFoundError{Ref: gbs.remoteTrackingRef} } - // 2) Force-set owned local ref to remote head (no merge; remote is source-of-truth). - if err := gbs.api.UpdateRef(ctx, gbs.localRef, remoteHead, "gitblobstore: sync read"); err != nil { - return err - } - - // 3) Merge cache to reflect fetched contents. + // Merge cache to reflect fetched contents. return gbs.mergeCacheFromHead(ctx, remoteHead) } @@ -597,11 +672,6 @@ func (e *gitblobstoreFetchRefError) Unwrap() error { return e.err } func (gbs *GitBlobstore) fetchAlignAndMergeForWrite(ctx context.Context) (remoteHead git.OID, ok bool, err error) { if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil { // If the remote ref is missing, treat this as an empty store and bootstrap on write. - // Note: there is no "empty ref" in Git; this means the ref is unborn (no commits yet). - // Callers will see ok=false and parent=="" and will: - // - build a root commit from an empty tree (no parent), - // - create/update gbs.localRef to that commit, and - // - push with an empty expected dst OID, which creates gbs.remoteRef on the remote. var rnf *git.RefNotFoundError if errors.As(err, &rnf) && rnf.Ref == gbs.remoteRef { return "", false, nil @@ -634,8 +704,9 @@ func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string if err := gbs.validateRemoteManaged(); err != nil { return "", err } - gbs.mu.Lock() - defer gbs.mu.Unlock() + + gbs.writeMu.Lock() + defer gbs.writeMu.Unlock() policy := gbs.casRetryPolicy(ctx) @@ -655,6 +726,7 @@ func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string if err != nil { return backoff.Permanent(err) } + if err := gbs.api.UpdateRef(ctx, gbs.localRef, newCommit, msg); err != nil { return backoff.Permanent(err) } @@ -895,10 +967,6 @@ func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, r } // Manifest key: fall through to existing remote-synced path. - if err := gbs.syncForRead(ctx); err != nil { - return "", err - } - msg := fmt.Sprintf("gitblobstore: put %s", key) plan, err := gbs.planPutWrites(ctx, key, totalSize, reader) if err != nil { @@ -1062,11 +1130,21 @@ func (gbs *GitBlobstore) buildCommitForKeyWrite(ctx context.Context, parent git. return "", err } - // Snapshot-only semantics: create commits with no parent so old snapshots become unreachable - // once refs advance (enables pruning / avoids cache history growth). - commitOID, err := gbs.api.CommitTree(ctx, treeOID, nil, msg, gbs.identity) + // Use parent commit when available so git push can compute incremental deltas + // instead of enumerating the full tree. After maxParentedCommits in the + // existing chain, create a parentless commit to sever history so git gc can + // prune old objects. + var parentPtr *git.OID + if hasParent && parent != "" { + depth, err := gbs.api.RevListCount(ctx, parent, maxParentedCommits+1) + if err == nil && depth < maxParentedCommits { + p := parent + parentPtr = &p + } + } + commitOID, err := gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, gbs.identity) if err != nil && gbs.identity == nil && isMissingGitIdentityErr(err) { - commitOID, err = gbs.api.CommitTree(ctx, treeOID, nil, msg, defaultGitBlobstoreIdentity()) + commitOID, err = gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, defaultGitBlobstoreIdentity()) } if err != nil { return "", err @@ -1121,15 +1199,15 @@ func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key s // For the manifest key, flush all pending writes in a single commit+push. if key == gitblobstoreManifestKey { - gbs.mu.Lock() + gbs.pendingMu.Lock() pending := gbs.pendingWrites gbs.pendingWrites = nil - gbs.mu.Unlock() + gbs.pendingMu.Unlock() ver, err := gbs.checkAndPutWithRemoteSync(ctx, expectedVersion, key, totalSize, reader, msg, pending) if err != nil && len(pending) > 0 { - gbs.mu.Lock() + gbs.pendingMu.Lock() gbs.pendingWrites = append(pending, gbs.pendingWrites...) - gbs.mu.Unlock() + gbs.pendingMu.Unlock() } return ver, err } @@ -1150,7 +1228,6 @@ func (gbs *GitBlobstore) currentKeyVersion(ctx context.Context, commit git.OID, } func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []string) (string, error) { - // Keep key validation for consistent error behavior. var err error key, err = normalizeGitTreePath(key) if err != nil { @@ -1174,26 +1251,17 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources [] return gbs.concatenateDeferred(ctx, key, sources) } - // Manifest key: fall through to existing remote-synced path. - if err := gbs.syncForRead(ctx); err != nil { - return "", err - } - - commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) - if err != nil { - return "", err - } - if !ok { - return "", NotFound{Key: key} - } - - plan, err := gbs.planConcatenation(ctx, key, sources, commit) - if err != nil { - return "", err - } - msg := fmt.Sprintf("gitblobstore: concatenate %s", key) - return gbs.putWithRemoteSync(ctx, key, plan, msg) + return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, okRemote bool) (git.OID, error) { + if !okRemote { + return "", NotFound{Key: key} + } + plan, err := gbs.planConcatenation(ctx, key, sources, remoteHead) + if err != nil { + return "", err + } + return gbs.buildCommitForKeyWrite(ctx, remoteHead, okRemote, key, plan, msg, nil) + }) } // concatenateDeferred handles Concatenate for non-manifest keys without remote sync. @@ -1233,9 +1301,9 @@ func (gbs *GitBlobstore) planConcatenation(ctx context.Context, key string, sour // enqueuePendingWrite appends a deferred write to pendingWrites, updates the // cache optimistically, and returns a synthetic version string. func (gbs *GitBlobstore) enqueuePendingWrite(key string, plan putPlan) string { - gbs.mu.Lock() + gbs.pendingMu.Lock() gbs.pendingWrites = append(gbs.pendingWrites, pendingWrite{key: key, plan: plan}) - gbs.mu.Unlock() + gbs.pendingMu.Unlock() gbs.cacheUpdateForPlan(key, plan) return plan.writes[0].oid.String() } diff --git a/go/store/blobstore/git_blobstore_cache_merge_semantics_test.go b/go/store/blobstore/git_blobstore_cache_merge_semantics_test.go index 160a6b7e9df..fa6e4d2834e 100644 --- a/go/store/blobstore/git_blobstore_cache_merge_semantics_test.go +++ b/go/store/blobstore/git_blobstore_cache_merge_semantics_test.go @@ -72,6 +72,8 @@ func TestGitBlobstore_CacheMerge_ManifestUpdatesAcrossFetches(t *testing.T) { RemoteName: "origin", }) require.NoError(t, err) + // Disable syncForRead dedup so the second read sees the remote mutation immediately. + bs.syncForReadTTL = 0 got1, ver1, err := GetBytes(ctx, bs, "manifest", AllRange) require.NoError(t, err) diff --git a/go/store/blobstore/git_blobstore_helpers_test.go b/go/store/blobstore/git_blobstore_helpers_test.go index 6dd49fea38a..95d27dc928f 100644 --- a/go/store/blobstore/git_blobstore_helpers_test.go +++ b/go/store/blobstore/git_blobstore_helpers_test.go @@ -89,6 +89,9 @@ func (f fakeGitAPI) WriteTree(ctx context.Context, indexFile string) (git.OID, e func (f fakeGitAPI) CommitTree(ctx context.Context, tree git.OID, parent *git.OID, message string, author *git.Identity) (git.OID, error) { panic("unexpected call") } +func (f fakeGitAPI) RevListCount(ctx context.Context, oid git.OID, maxCount int) (int, error) { + panic("unexpected call") +} func (f fakeGitAPI) UpdateRefCAS(ctx context.Context, ref string, newOID git.OID, oldOID git.OID, msg string) error { panic("unexpected call") } diff --git a/go/store/blobstore/git_blobstore_test.go b/go/store/blobstore/git_blobstore_test.go index cba2a0cf255..16b2208ee40 100644 --- a/go/store/blobstore/git_blobstore_test.go +++ b/go/store/blobstore/git_blobstore_test.go @@ -23,8 +23,10 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "sync/atomic" "testing" + "time" "github.com/stretchr/testify/require" @@ -130,7 +132,7 @@ func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) { _ = rc.Close() } -func TestGitBlobstore_RemoteManaged_ExistsFetchesAndAligns(t *testing.T) { +func TestGitBlobstore_RemoteManaged_ExistsFetchesAndTracks(t *testing.T) { requireGitOnPath(t) ctx := context.Background() @@ -165,9 +167,10 @@ func TestGitBlobstore_RemoteManaged_ExistsFetchesAndAligns(t *testing.T) { require.True(t, strings.HasPrefix(bs.localRef, "refs/dolt/blobstore/origin/dolt/data/")) localAPI := git.NewGitAPIImpl(localRunner) - gotLocal, err := localAPI.ResolveRefCommit(ctx, bs.localRef) - require.NoError(t, err) - require.Equal(t, git.OID(remoteCommit), gotLocal) + _, err = localAPI.ResolveRefCommit(ctx, bs.localRef) + var rnf *git.RefNotFoundError + require.ErrorAs(t, err, &rnf) + require.Equal(t, bs.localRef, rnf.Ref) gotTracking, err := localAPI.ResolveRefCommit(ctx, bs.remoteTrackingRef) require.NoError(t, err) @@ -241,12 +244,15 @@ func TestGitBlobstore_TwoInstances_IndependentTrackingRefs(t *testing.T) { require.NoError(t, err) require.Equal(t, head1, head2, "both should track the same remote commit") - // Verify local refs are also distinct and valid. - local1, err := localAPI.ResolveRefCommit(ctx, bs1.localRef) - require.NoError(t, err) - local2, err := localAPI.ResolveRefCommit(ctx, bs2.localRef) - require.NoError(t, err) - require.Equal(t, local1, local2, "both should point at the same commit") + // Local refs are distinct but are not created by read sync. + _, err = localAPI.ResolveRefCommit(ctx, bs1.localRef) + var rnf *git.RefNotFoundError + require.ErrorAs(t, err, &rnf) + require.Equal(t, bs1.localRef, rnf.Ref) + _, err = localAPI.ResolveRefCommit(ctx, bs2.localRef) + rnf = nil + require.ErrorAs(t, err, &rnf) + require.Equal(t, bs2.localRef, rnf.Ref) } func TestGitBlobstore_CleanupOwnedLocalRef_DeletesRef(t *testing.T) { @@ -301,11 +307,18 @@ func TestGitBlobstore_Close_DeletesOwnedLocalAndTrackingRefs(t *testing.T) { require.True(t, ok) localAPI := git.NewGitAPIImpl(localRunner) - _, err = localAPI.ResolveRefCommit(ctx, bs.localRef) - require.NoError(t, err) _, err = localAPI.ResolveRefCommit(ctx, bs.remoteTrackingRef) require.NoError(t, err) + // Read sync no longer creates/aligns the local ref. Seed it so we can + // validate Close deletes it when present. + _, err = localRepo.SetRefToTree(ctx, bs.localRef, map[string][]byte{ + "manifest": []byte("seed localRef\n"), + }, "seed localRef") + require.NoError(t, err) + _, err = localAPI.ResolveRefCommit(ctx, bs.localRef) + require.NoError(t, err) + require.NoError(t, bs.Close()) _, err = localAPI.ResolveRefCommit(ctx, bs.localRef) @@ -476,11 +489,11 @@ func TestGitBlobstore_RemoteManaged_PutRetriesOnLeaseFailure(t *testing.T) { remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) require.NoError(t, err) - // Snapshot-only semantics: new remote head should be a non-merge commit with no parent. + // The commit should have a single parent (the external writer's commit that caused the retry). if v := externalHead.Load(); v != nil { out, err := remoteRunner.Run(ctx, git.RunOptions{}, "cat-file", "-p", remoteHead.String()) require.NoError(t, err) - require.NotContains(t, string(out), "\nparent ") + require.Contains(t, string(out), "\nparent ") } oid, typ, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "k") @@ -495,6 +508,99 @@ func TestGitBlobstore_RemoteManaged_PutRetriesOnLeaseFailure(t *testing.T) { require.Equal(t, []byte("after retry\n"), got) } +func TestGitBlobstore_RemoteManaged_ManifestReadsDoNotBlockDuringPush(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + remoteHead, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + "manifest": []byte("seed\n"), + }, "seed") + require.NoError(t, err) + require.NotEmpty(t, remoteHead) + + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ + RemoteName: "origin", + Identity: testIdentity(), + }) + require.NoError(t, err) + + // Prime the manifest version so our write uses the expected version. + _, ver, err := GetBytes(ctx, bs, "manifest", AllRange) + require.NoError(t, err) + require.NotEmpty(t, ver) + + startedPush := make(chan struct{}) + releasePush := make(chan struct{}) + + // Block the first push while holding the writer lock, then ensure manifest reads + // can still proceed (they should not wait on the writer lock anymore). + bs.api = &hookPushGitAPI{ + GitAPI: bs.api, + onFirstPush: func(ctx context.Context) { + close(startedPush) + <-releasePush + }, + } + + writeErr := make(chan error, 1) + go func() { + _, err := PutBytes(ctx, bs, "k", []byte("from local\n")) + if err != nil { + writeErr <- err + return + } + _, err = bs.CheckAndPut(ctx, ver, "manifest", int64(len("next\n")), bytes.NewReader([]byte("next\n"))) + writeErr <- err + }() + + select { + case <-startedPush: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for push to start") + } + + const readers = 25 + var wg sync.WaitGroup + readErrs := make(chan error, readers) + for range readers { + wg.Add(1) + go func() { + defer wg.Done() + rctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + _, _, err := GetBytes(rctx, bs, "manifest", AllRange) + readErrs <- err + }() + } + wg.Wait() + close(readErrs) + + for err := range readErrs { + require.NoError(t, err) + } + + close(releasePush) + require.NoError(t, <-writeErr) + + // Verify remote contains the key after push completes. + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) + require.NoError(t, err) + remoteAPI2 := git.NewGitAPIImpl(remoteRunner) + newHead, err := remoteAPI2.ResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + oid, typ, err := remoteAPI2.ResolvePathObject(ctx, newHead, "k") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeBlob, typ) + rrc, err := remoteAPI2.BlobReader(ctx, oid) + require.NoError(t, err) + got, err := io.ReadAll(rrc) + require.NoError(t, err) + require.NoError(t, rrc.Close()) + require.Equal(t, []byte("from local\n"), got) +} + func TestGitBlobstore_RemoteManaged_CheckAndPut_RemoteHeadTruth(t *testing.T) { requireGitOnPath(t) @@ -662,10 +768,10 @@ func TestGitBlobstore_RemoteManaged_PutOverwritesDivergedLocalRef_NoMergeCommit( remoteHeadAfter, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) require.NoError(t, err) - // Snapshot-only semantics: new remote head should be a non-merge commit with no parent. + // The commit should have a single parent (the remote head we fetched and built on top of). out, err := remoteRunner.Run(ctx, git.RunOptions{}, "cat-file", "-p", remoteHeadAfter.String()) require.NoError(t, err) - require.NotContains(t, string(out), "\nparent ") + require.Contains(t, string(out), "\nparent ") // Local-only divergence should not be present on remote. _, _, err = remoteAPI.ResolvePathObject(ctx, remoteHeadAfter, "local") diff --git a/go/store/blobstore/internal/git/api.go b/go/store/blobstore/internal/git/api.go index 5ae62d870f7..4e6a039b39a 100644 --- a/go/store/blobstore/internal/git/api.go +++ b/go/store/blobstore/internal/git/api.go @@ -120,6 +120,12 @@ type GitAPI interface { // GIT_DIR=... git fetch +: FetchRef(ctx context.Context, remote string, srcRef string, dstRef string) error + // RevListCount returns the number of commits reachable from |oid| (inclusive), + // counting at most |maxCount| commits. Pass 0 for unlimited. + // Equivalent plumbing: + // GIT_DIR=... git rev-list --count [--max-count=N] + RevListCount(ctx context.Context, oid OID, maxCount int) (int, error) + // PushRefWithLease pushes |srcRef| to |dstRef| on |remote|, but only if the remote's |dstRef| // equals |expectedDstOID| (force-with-lease). If |expectedDstOID| is empty, it enforces that // the remote |dstRef| is missing (bootstrap / create-if-missing semantics). diff --git a/go/store/blobstore/internal/git/impl.go b/go/store/blobstore/internal/git/impl.go index 32e2c32d1ee..9b3e11f90ff 100644 --- a/go/store/blobstore/internal/git/impl.go +++ b/go/store/blobstore/internal/git/impl.go @@ -287,6 +287,19 @@ func (a *GitAPIImpl) CommitTree(ctx context.Context, tree OID, parent *OID, mess return OID(oid), nil } +func (a *GitAPIImpl) RevListCount(ctx context.Context, oid OID, maxCount int) (int, error) { + args := []string{"rev-list", "--count"} + if maxCount > 0 { + args = append(args, fmt.Sprintf("--max-count=%d", maxCount)) + } + args = append(args, oid.String()) + out, err := a.r.Run(ctx, RunOptions{}, args...) + if err != nil { + return 0, err + } + return strconv.Atoi(strings.TrimSpace(string(out))) +} + func (a *GitAPIImpl) UpdateRefCAS(ctx context.Context, ref string, newOID OID, oldOID OID, msg string) error { args := []string{"update-ref"} if msg != "" { diff --git a/go/store/nbs/bs_manifest.go b/go/store/nbs/bs_manifest.go index 1a812fde5e5..0142e6b16f2 100644 --- a/go/store/nbs/bs_manifest.go +++ b/go/store/nbs/bs_manifest.go @@ -38,14 +38,12 @@ func (bsm blobstoreManifest) Name() string { func manifestVersionAndContents(ctx context.Context, bs blobstore.Blobstore) (string, manifestContents, error) { reader, _, ver, err := bs.Get(ctx, manifestFile, blobstore.AllRange) - if err != nil { return "", manifestContents{}, err } defer reader.Close() contents, err := parseManifest(reader) - if err != nil { return "", manifestContents{}, err } @@ -96,7 +94,6 @@ func updateBSWithChecker(ctx context.Context, behavior dherrors.FatalBehavior, b } ver, contents, err := manifestVersionAndContents(ctx, bs) - if err != nil && !blobstore.IsNotFoundError(err) { return manifestContents{}, err } @@ -116,7 +113,6 @@ func updateBSWithChecker(ctx context.Context, behavior dherrors.FatalBehavior, b } _, err = bs.CheckAndPut(ctx, ver, manifestFile, int64(buffer.Len()), buffer) - if err != nil { if !blobstore.IsCheckAndPutError(err) { return manifestContents{}, err diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 86e586b19a0..ea948f6e8f5 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -82,6 +82,7 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, behavior dherrors.Fa if _, err = bsp.bs.Concatenate(ctx, name, []string{recordsName, tailName}); err != nil { return emptyChunkSource{}, gcBehavior_Continue, err } + rdr := &bsTableReaderAt{key: name, bs: bsp.bs} src, err := newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize) if err != nil { diff --git a/go/store/nbs/git_blobstore_empty_remote_test.go b/go/store/nbs/git_blobstore_empty_remote_test.go index 89340c1efe7..9b952be3f34 100644 --- a/go/store/nbs/git_blobstore_empty_remote_test.go +++ b/go/store/nbs/git_blobstore_empty_remote_test.go @@ -135,8 +135,8 @@ func TestNBS_NewGitStore_DefaultsMaxPartSizeTo50MB(t *testing.T) { defer store.Close() // Assert the underlying blobstore is a GitBlobstore and that chunked writes are enabled by default. - bsp, ok := store.persister.(*blobstorePersister) - require.True(t, ok, "expected persister to be *blobstorePersister, got %T", store.persister) + bsp, ok := store.persister.(*singleBlobBSPersister) + require.True(t, ok, "expected persister to be *singleBlobBSPersister, got %T", store.persister) gbs, ok := bsp.bs.(*blobstore.GitBlobstore) require.True(t, ok, "expected blobstore to be *blobstore.GitBlobstore, got %T", bsp.bs) diff --git a/go/store/nbs/single_blob_bs_persister.go b/go/store/nbs/single_blob_bs_persister.go new file mode 100644 index 00000000000..fb1b68bfc54 --- /dev/null +++ b/go/store/nbs/single_blob_bs_persister.go @@ -0,0 +1,159 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nbs + +import ( + "bytes" + "context" + "io" + "time" + + dherrors "github.com/dolthub/dolt/go/libraries/utils/errors" + "github.com/dolthub/dolt/go/store/blobstore" + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/hash" +) + +// singleBlobBSPersister writes table files as single blobs (no .records/.tail split) +// while still supporting ConjoinAll. This avoids intermediate blobs in the git tree +// that the standard blobstorePersister creates. +type singleBlobBSPersister struct { + bs blobstore.Blobstore + q MemoryQuotaProvider + blockSize uint64 +} + +var _ tablePersister = &singleBlobBSPersister{} +var _ tableFilePersister = &singleBlobBSPersister{} + +func (bsp *singleBlobBSPersister) Persist(ctx context.Context, behavior dherrors.FatalBehavior, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { + address, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats) + if err != nil { + return emptyChunkSource{}, gcBehavior_Continue, err + } else if gcb != gcBehavior_Continue { + return emptyChunkSource{}, gcb, nil + } else if chunkCount == 0 { + return emptyChunkSource{}, gcBehavior_Continue, nil + } + name := address.String() + + _, err = bsp.bs.Put(ctx, name, int64(len(data)), bytes.NewBuffer(data)) + if err != nil { + return nil, gcBehavior_Continue, err + } + + rdr := &bsTableReaderAt{key: name, bs: bsp.bs} + src, err := newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize) + if err != nil { + return nil, gcBehavior_Continue, err + } + return src, gcBehavior_Continue, nil +} + +func (bsp *singleBlobBSPersister) ConjoinAll(ctx context.Context, behavior dherrors.FatalBehavior, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) { + plan, err := planRangeCopyConjoin(ctx, sources, bsp.q, stats) + if err != nil { + return nil, nil, err + } + defer plan.closer() + + if plan.chunkCount == 0 { + return emptyChunkSource{}, nil, nil + } + + name := plan.name.String() + plan.suffix + + // Read chunk records from each source via range reads and stream them + // together with the merged index into a single blob. No intermediate + // .records or .tail blobs are created. + readers := make([]io.Reader, 0, len(plan.sources.sws)+1) + closers := make([]io.Closer, 0, len(plan.sources.sws)) + + for _, sws := range plan.sources.sws { + srcName := sws.source.hash().String() + sws.source.suffix() + dataLen := int64(sws.dataLen) + rng := blobstore.NewBlobRange(0, dataLen) + rdr, _, _, err := bsp.bs.Get(ctx, srcName, rng) + if err != nil { + for _, c := range closers { + c.Close() + } + return nil, nil, err + } + closers = append(closers, rdr) + readers = append(readers, io.LimitReader(rdr, dataLen)) + } + + readers = append(readers, bytes.NewReader(plan.mergedIndex)) + totalSize := int64(plan.totalCompressedData) + int64(len(plan.mergedIndex)) + + _, err = bsp.bs.Put(ctx, name, totalSize, io.MultiReader(readers...)) + for _, c := range closers { + c.Close() + } + if err != nil { + return nil, nil, err + } + + var cs chunkSource + if plan.suffix == ArchiveFileSuffix { + cs, err = newBSArchiveChunkSource(ctx, bsp.bs, plan.name, bsp.q, stats) + } else { + cs, err = newBSTableChunkSource(ctx, bsp.bs, plan.name, plan.chunkCount, bsp.q, stats) + } + + return cs, func() {}, err +} + +func (bsp *singleBlobBSPersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { + cs, err := newBSTableChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats) + if err == nil { + return cs, nil + } + + if blobstore.IsNotFoundError(err) { + return newBSArchiveChunkSource(ctx, bsp.bs, name, bsp.q, stats) + } + + return nil, err +} + +func (bsp *singleBlobBSPersister) Exists(ctx context.Context, name string, _ uint32, _ *Stats) (bool, error) { + return bsp.bs.Exists(ctx, name) +} + +func (bsp *singleBlobBSPersister) PruneTableFiles(ctx context.Context, keeper func() []hash.Hash, t time.Time) error { + return nil +} + +func (bsp *singleBlobBSPersister) Close() error { + if c, ok := bsp.bs.(io.Closer); ok { + return c.Close() + } + return nil +} + +func (bsp *singleBlobBSPersister) AccessMode() chunks.ExclusiveAccessMode { + return chunks.ExclusiveAccessMode_Shared +} + +func (bsp *singleBlobBSPersister) Path() string { + return "" +} + +func (bsp *singleBlobBSPersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, _ uint64) error { + _, err := bsp.bs.Put(ctx, name, int64(fileSz), r) + return err +} diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 6c3ad9aca9d..2df454c49a5 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -665,7 +665,10 @@ func NewGitStore(ctx context.Context, nbfVerStr string, gitDir string, ref strin if err != nil { return nil, err } - return NewBSStore(ctx, nbfVerStr, bs, memTableSize, q) + + mm := makeManifestManager(blobstoreManifest{bs}) + p := &singleBlobBSPersister{bs, q, s3BlockSize} + return newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{defaultMaxTables}, memTableSize) } // NewNoConjoinGitStore returns an nbs implementation backed by a GitBlobstore, but disables conjoin. @@ -681,7 +684,10 @@ func NewNoConjoinGitStore(ctx context.Context, nbfVerStr string, gitDir string, if err != nil { return nil, err } - return NewNoConjoinBSStore(ctx, nbfVerStr, bs, memTableSize, q) + + mm := makeManifestManager(blobstoreManifest{bs}) + p := &noConjoinBlobstorePersister{bs, q, s3BlockSize} + return newNomsBlockStore(ctx, nbfVerStr, mm, p, q, noopConjoiner{}, memTableSize) } // NewBSStore returns an nbs implementation backed by a Blobstore @@ -804,14 +810,12 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, mm manifestManager defer nbs.stats.OpenLatency.SampleTimeSince(t1) exists, contents, _, err := nbs.manifestMgr.Fetch(ctx, nbs.stats) - if err != nil { return nil, err } if exists { newTables, err := nbs.tables.rebase(ctx, contents.specs, nil, nbs.stats) - if err != nil { return nil, err }