diff --git a/go/store/blobstore/git_blobstore.go b/go/store/blobstore/git_blobstore.go index c005dd00d69..6b9a72e08f6 100644 --- a/go/store/blobstore/git_blobstore.go +++ b/go/store/blobstore/git_blobstore.go @@ -21,19 +21,19 @@ import ( "fmt" "io" "math" - "os" - "path/filepath" "sort" "strconv" "strings" + "sync" "time" "github.com/cenkalti/backoff/v4" + "github.com/google/uuid" git "github.com/dolthub/dolt/go/store/blobstore/internal/git" ) -const gitblobstorePartNameWidth = 8 // "00000001" +const gitblobstorePartNameWidth = 4 // "0001" type chunkPartRef struct { oidHex string @@ -53,7 +53,7 @@ type treeWrite struct { type putPlan struct { writes []treeWrite - // If true, the key should be represented as a tree (chunked parts under key/NNNNNNNN). + // If true, the key should be represented as a tree (chunked parts under key/NNNN). chunked bool } @@ -254,14 +254,15 @@ func (c *concatReadCloser) Close() error { // GitBlobstore is a Blobstore implementation backed by a git repository's object // database (bare repo or .git directory). It stores keys as paths within the tree // of the commit referenced by a git ref (e.g. refs/dolt/data). -// -// This implementation is being developed in phases. Read paths were implemented first, -// then write paths were added incrementally. type GitBlobstore struct { - gitDir string - ref string - runner *git.Runner - api git.GitAPI + gitDir string + remoteRef string + localRef string + runner *git.Runner + api git.GitAPI + remoteName string + remoteTrackingRef string + mu sync.Mutex // identity, when non-nil, is used as the author/committer identity for new commits. // When nil, we prefer whatever identity git derives from env/config, falling back // to a deterministic default only if git reports the identity is missing. @@ -277,8 +278,7 @@ type GitBlobstore struct { var _ Blobstore = (*GitBlobstore)(nil) // NewGitBlobstore creates a new GitBlobstore rooted at |gitDir| and |ref|. -// |gitDir| should point at a bare repo directory or a .git directory. Put is implemented, -// while CheckAndPut and Concatenate are still unimplemented (see type-level docs). +// |gitDir| should point at a bare repo directory or a .git directory. func NewGitBlobstore(gitDir, ref string) (*GitBlobstore, error) { return NewGitBlobstoreWithOptions(gitDir, ref, GitBlobstoreOptions{}) } @@ -296,6 +296,9 @@ type GitBlobstoreOptions struct { // MaxPartSize enables chunked-object writes when non-zero. // Read paths always support chunked objects if encountered. MaxPartSize uint64 + // RemoteName is the git remote name to use for remote-managed mode (e.g. "origin"). + // If empty, it defaults to "origin". + RemoteName string } // NewGitBlobstoreWithOptions creates a GitBlobstore rooted at |gitDir| and |ref|. @@ -304,18 +307,165 @@ func NewGitBlobstoreWithOptions(gitDir, ref string, opts GitBlobstoreOptions) (* if err != nil { return nil, err } + + remoteName := opts.RemoteName + if remoteName == "" { + remoteName = "origin" + } + remoteRef := ref + remoteTrackingRef := RemoteTrackingRef(remoteName, remoteRef) + localRef := OwnedLocalRef(remoteName, remoteRef, uuid.NewString()) + return &GitBlobstore{ - gitDir: gitDir, - ref: ref, - runner: r, - api: git.NewGitAPIImpl(r), - identity: opts.Identity, - maxPartSize: opts.MaxPartSize, + gitDir: gitDir, + remoteRef: remoteRef, + localRef: localRef, + runner: r, + api: git.NewGitAPIImpl(r), + remoteName: remoteName, + remoteTrackingRef: remoteTrackingRef, + identity: opts.Identity, + maxPartSize: opts.MaxPartSize, }, nil } func (gbs *GitBlobstore) Path() string { - return fmt.Sprintf("%s@%s", gbs.gitDir, gbs.ref) + return fmt.Sprintf("%s@%s", gbs.gitDir, gbs.remoteRef) +} + +func (gbs *GitBlobstore) validateRemoteManaged() error { + if gbs.remoteName == "" || gbs.remoteRef == "" || gbs.remoteTrackingRef == "" || gbs.localRef == "" { + return fmt.Errorf("gitblobstore: remote-managed mode misconfigured (remoteName=%q remoteRef=%q trackingRef=%q localRef=%q)", gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef, gbs.localRef) + } + return nil +} + +// CleanupOwnedLocalRef best-effort deletes this blobstore instance's UUID-owned local ref. +// +// This is an optional hygiene API: by default, UUID-owned local refs may accumulate in the +// repo over time. Callers that care about cleanup (e.g. tests) may invoke this explicitly. +func (gbs *GitBlobstore) CleanupOwnedLocalRef(ctx context.Context) error { + gbs.mu.Lock() + defer gbs.mu.Unlock() + + _, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) + if err != nil { + return err + } + if !ok { + return nil + } + _, err = gbs.runner.Run(ctx, git.RunOptions{}, "update-ref", "-d", gbs.localRef) + return err +} + +func (gbs *GitBlobstore) syncForRead(ctx context.Context) error { + if err := gbs.validateRemoteManaged(); err != nil { + return err + } + gbs.mu.Lock() + defer gbs.mu.Unlock() + + // 1) Fetch remote ref into our remote-tracking ref. + if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil { + return err + } + + remoteHead, okRemote, err := gbs.api.TryResolveRefCommit(ctx, gbs.remoteTrackingRef) + if err != nil { + return err + } + if !okRemote { + return &git.RefNotFoundError{Ref: gbs.remoteTrackingRef} + } + + // 2) Force-set owned local ref to remote head (no merge; remote is source-of-truth). + return gbs.api.UpdateRef(ctx, gbs.localRef, remoteHead, "gitblobstore: sync read") +} + +func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string, build func(parent git.OID, ok bool) (git.OID, error)) (string, error) { + if err := gbs.validateRemoteManaged(); err != nil { + return "", err + } + gbs.mu.Lock() + defer gbs.mu.Unlock() + + policy := gbs.casRetryPolicy(ctx) + + var ver string + op := func() error { + // 1) Fetch remote state into local tracking ref. + if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil { + return err + } + remoteHead, okRemote, err := gbs.api.TryResolveRefCommit(ctx, gbs.remoteTrackingRef) + if err != nil { + return backoff.Permanent(err) + } + if !okRemote { + return backoff.Permanent(&git.RefNotFoundError{Ref: gbs.remoteTrackingRef}) + } + + // 2) Force-set owned local ref to remote head (remote is source-of-truth). + if err := gbs.api.UpdateRef(ctx, gbs.localRef, remoteHead, "gitblobstore: sync write"); err != nil { + return backoff.Permanent(err) + } + + // 3) Apply this operation's changes on top of the remote head. + newCommit, err := build(remoteHead, true) + if err != nil { + return backoff.Permanent(err) + } + if err := gbs.api.UpdateRef(ctx, gbs.localRef, newCommit, msg); err != nil { + return backoff.Permanent(err) + } + + // 4) Push local ref to remote with lease. + if err := gbs.api.PushRefWithLease(ctx, gbs.remoteName, gbs.localRef, gbs.remoteRef, remoteHead); err != nil { + return err + } + + ver, err = gbs.resolveKeyVersionAtCommit(ctx, newCommit, key) + if err != nil { + return backoff.Permanent(err) + } + return nil + } + + if err := backoff.Retry(op, policy); err != nil { + if ctx.Err() != nil { + return "", ctx.Err() + } + return "", err + } + return ver, nil +} + +func (gbs *GitBlobstore) putWithRemoteSync(ctx context.Context, key string, plan putPlan, msg string) (string, error) { + return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, _ bool) (git.OID, error) { + return gbs.buildCommitForKeyWrite(ctx, remoteHead, true, key, plan, msg) + }) +} + +func (gbs *GitBlobstore) checkAndPutWithRemoteSync(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader, msg string) (string, error) { + var cachedPlan *putPlan + return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, _ bool) (git.OID, error) { + actualKeyVersion, err := gbs.currentKeyVersion(ctx, remoteHead, true, key) + if err != nil { + return git.OID(""), err + } + if expectedVersion != actualKeyVersion { + return git.OID(""), CheckAndPutError{Key: key, ExpectedVersion: expectedVersion, ActualVersion: actualKeyVersion} + } + if cachedPlan == nil { + plan, err := gbs.planPutWrites(ctx, key, totalSize, reader) + if err != nil { + return git.OID(""), err + } + cachedPlan = &plan + } + return gbs.buildCommitForKeyWrite(ctx, remoteHead, true, key, *cachedPlan, msg) + }) } func (gbs *GitBlobstore) Exists(ctx context.Context, key string) (bool, error) { @@ -323,7 +473,10 @@ func (gbs *GitBlobstore) Exists(ctx context.Context, key string) (bool, error) { if err != nil { return false, err } - commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + if err := gbs.syncForRead(ctx); err != nil { + return false, err + } + commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) if err != nil { return false, err } @@ -345,6 +498,9 @@ func (gbs *GitBlobstore) Get(ctx context.Context, key string, br BlobRange) (io. if err != nil { return nil, 0, "", err } + if err := gbs.syncForRead(ctx); err != nil { + return nil, 0, "", err + } commit, err := gbs.resolveCommitForGet(ctx, key) if err != nil { return nil, 0, "", err @@ -414,10 +570,9 @@ func (gbs *GitBlobstore) validateAndSizeChunkedParts(ctx context.Context, entrie return nil, 0, fmt.Errorf("gitblobstore: chunked tree has no parts") } - width := len(entries[0].Name) - // First pass: validate names + types, and determine width. - if width < 4 { - return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected at least 4 digits)", entries[0].Name) + // GitBlobstore chunked trees use fixed-width 4-digit part names: 0001, 0002, ... + if len(entries[0].Name) != gitblobstorePartNameWidth { + return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected width %d)", entries[0].Name, gitblobstorePartNameWidth) } parts := make([]chunkPartRef, 0, len(entries)) @@ -426,18 +581,18 @@ func (gbs *GitBlobstore) validateAndSizeChunkedParts(ctx context.Context, entrie if e.Type != git.ObjectTypeBlob { return nil, 0, fmt.Errorf("gitblobstore: invalid part %q: expected blob, got %q", e.Name, e.Type) } - if len(e.Name) != width { - return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected width %d)", e.Name, width) + if len(e.Name) != gitblobstorePartNameWidth { + return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected width %d)", e.Name, gitblobstorePartNameWidth) } n, err := strconv.Atoi(e.Name) if err != nil { return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected digits): %w", e.Name, err) } if n != i+1 { - want := fmt.Sprintf("%0*d", width, i+1) + want := fmt.Sprintf("%0*d", gitblobstorePartNameWidth, i+1) return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected %q)", e.Name, want) } - if want := fmt.Sprintf("%0*d", width, n); want != e.Name { + if want := fmt.Sprintf("%0*d", gitblobstorePartNameWidth, n); want != e.Name { return nil, 0, fmt.Errorf("gitblobstore: invalid part name %q (expected %q)", e.Name, want) } @@ -458,7 +613,7 @@ func (gbs *GitBlobstore) validateAndSizeChunkedParts(ctx context.Context, entrie } func (gbs *GitBlobstore) resolveCommitForGet(ctx context.Context, key string) (commit git.OID, err error) { - commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) if err != nil { return git.OID(""), err } @@ -471,7 +626,7 @@ func (gbs *GitBlobstore) resolveCommitForGet(ctx context.Context, key string) (c if key == "manifest" { return git.OID(""), NotFound{Key: key} } - return git.OID(""), &git.RefNotFoundError{Ref: gbs.ref} + return git.OID(""), &git.RefNotFoundError{Ref: gbs.localRef} } func (gbs *GitBlobstore) resolveObjectForGet(ctx context.Context, commit git.OID, key string) (oid git.OID, typ git.ObjectType, err error) { @@ -499,6 +654,11 @@ func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, r return "", err } + // Ensure the idempotent "key exists" fast-path observes remote state. + if err := gbs.syncForRead(ctx); err != nil { + return "", err + } + // Many NBS/table-file writes are content-addressed: if the key already exists, callers // assume it refers to the same bytes and treat the operation as idempotent. // @@ -515,13 +675,13 @@ func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, r msg := fmt.Sprintf("gitblobstore: put %s", key) - // Hash the contents once. If we need to retry due to concurrent updates to |gbs.ref|, + // Hash the contents once. If we need to retry due to a concurrent remote advance (lease failure), // we can reuse the resulting object OIDs without re-reading |reader|. plan, err := gbs.planPutWrites(ctx, key, totalSize, reader) if err != nil { return "", err } - return gbs.putWithCASRetries(ctx, key, plan, msg) + return gbs.putWithRemoteSync(ctx, key, plan, msg) } func (gbs *GitBlobstore) planPutWrites(ctx context.Context, key string, totalSize int64, reader io.Reader) (putPlan, error) { @@ -596,44 +756,6 @@ func (gbs *GitBlobstore) hashParts(ctx context.Context, reader io.Reader) (parts return parts, partOIDs, total, nil } -func (gbs *GitBlobstore) putWithCASRetries(ctx context.Context, key string, plan putPlan, msg string) (string, error) { - // Make Put resilient to concurrent writers updating unrelated keys by using a CAS loop - // under the hood. This matches typical object-store semantics more closely than an - // unconditional ref update (which could clobber other keys). - policy := gbs.casRetryPolicy(ctx) - - var ver string - op := func() error { - parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) - if err != nil { - return backoff.Permanent(err) - } - - newCommit, err := gbs.buildCommitForKeyWrite(ctx, parent, ok, key, plan, msg) - if err != nil { - return backoff.Permanent(err) - } - - if err := gbs.updateRefCASForWrite(ctx, parent, ok, newCommit, msg); err != nil { - return err - } - - ver, err = gbs.resolveKeyVersionAtCommit(ctx, newCommit, key) - if err != nil { - return backoff.Permanent(err) - } - return nil - } - - if err := backoff.Retry(op, policy); err != nil { - if ctx.Err() != nil { - return "", ctx.Err() - } - return "", err - } - return ver, nil -} - func (gbs *GitBlobstore) casRetryPolicy(ctx context.Context) backoff.BackOff { const maxRetries = 31 // 32 total attempts (initial + retries) bo := backoff.NewExponentialBackOff() @@ -646,7 +768,7 @@ func (gbs *GitBlobstore) casRetryPolicy(ctx context.Context) backoff.BackOff { } func (gbs *GitBlobstore) buildCommitForKeyWrite(ctx context.Context, parent git.OID, hasParent bool, key string, plan putPlan, msg string) (git.OID, error) { - _, indexFile, cleanup, err := newTempIndex() + _, indexFile, cleanup, err := git.NewTempIndex() if err != nil { return "", err } @@ -733,39 +855,6 @@ func (gbs *GitBlobstore) removeKeyConflictsFromIndex(ctx context.Context, parent } } -func (gbs *GitBlobstore) updateRefCASForWrite(ctx context.Context, parent git.OID, haveParent bool, newCommit git.OID, msg string) error { - if !haveParent { - // Create-only CAS: oldOID=all-zero requires the ref to not exist. This avoids - // losing concurrent writes when multiple goroutines create the ref at once. - const zeroOID = git.OID("0000000000000000000000000000000000000000") - if err := gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, zeroOID, msg); err != nil { - if gbs.refAdvanced(ctx, parent) { - return err - } - return backoff.Permanent(err) - } - return nil - } - - if err := gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg); err != nil { - // If the ref changed since we read |parent|, retry on the new head. Otherwise - // surface the error (e.g. permissions, corruption). - if gbs.refAdvanced(ctx, parent) { - return err - } - return backoff.Permanent(err) - } - return nil -} - -func (gbs *GitBlobstore) refAdvanced(ctx context.Context, old git.OID) bool { - if ctx.Err() != nil { - return false - } - cur, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) - return err == nil && ok && cur != old -} - func (gbs *GitBlobstore) resolveKeyVersionAtCommit(ctx context.Context, commit git.OID, key string) (string, error) { oid, _, err := gbs.api.ResolvePathObject(ctx, commit, key) if err != nil { @@ -779,7 +868,7 @@ func (gbs *GitBlobstore) tryFastSucceedPutIfKeyExists(ctx context.Context, key s return "", false, nil } - commit, haveCommit, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + commit, haveCommit, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) if err != nil { return "", false, err } @@ -805,61 +894,7 @@ func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key s } msg := fmt.Sprintf("gitblobstore: checkandput %s", key) - - policy := gbs.casRetryPolicy(ctx) - - var newKeyVersion string - var cachedPlan *putPlan - op := func() error { - parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) - if err != nil { - return backoff.Permanent(err) - } - - actualKeyVersion, err := gbs.currentKeyVersion(ctx, parent, ok, key) - if err != nil { - return backoff.Permanent(err) - } - if expectedVersion != actualKeyVersion { - return backoff.Permanent(CheckAndPutError{Key: key, ExpectedVersion: expectedVersion, ActualVersion: actualKeyVersion}) - } - - // Only hash/consume the reader once we know the expectedVersion matches. - // If we need to retry due to unrelated ref advances, reuse the cached plan so we - // don't re-read |reader| (which may not be rewindable). - if cachedPlan == nil { - plan, err := gbs.planPutWrites(ctx, key, totalSize, reader) - if err != nil { - return backoff.Permanent(err) - } - cachedPlan = &plan - } - - newCommit, err := gbs.buildCommitForKeyWrite(ctx, parent, ok, key, *cachedPlan, msg) - if err != nil { - return backoff.Permanent(err) - } - - if err := gbs.updateRefCASForWrite(ctx, parent, ok, newCommit, msg); err != nil { - return err - } - - ver, err := gbs.resolveKeyVersionAtCommit(ctx, newCommit, key) - if err != nil { - return backoff.Permanent(err) - } - newKeyVersion = ver - return nil - } - - if err := backoff.Retry(op, policy); err != nil { - if ctx.Err() != nil { - return "", ctx.Err() - } - return "", err - } - - return newKeyVersion, nil + return gbs.checkAndPutWithRemoteSync(ctx, expectedVersion, key, totalSize, reader, msg) } func (gbs *GitBlobstore) currentKeyVersion(ctx context.Context, commit git.OID, haveCommit bool, key string) (string, error) { @@ -884,6 +919,9 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources [] if err != nil { return "", err } + if err := gbs.syncForRead(ctx); err != nil { + return "", err + } if len(sources) == 0 { return "", fmt.Errorf("gitblobstore: concatenate requires at least one source") } @@ -905,7 +943,7 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources [] } // Resolve a snapshot commit for the sources. - commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.localRef) if err != nil { return "", err } @@ -914,7 +952,7 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources [] if key == "manifest" { return "", NotFound{Key: key} } - return "", &git.RefNotFoundError{Ref: gbs.ref} + return "", &git.RefNotFoundError{Ref: gbs.localRef} } totalSz, err := gbs.totalSizeAtCommit(ctx, commit, sources) @@ -937,7 +975,7 @@ func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources [] } msg := fmt.Sprintf("gitblobstore: concatenate %s", key) - return gbs.putWithCASRetries(ctx, key, plan, msg) + return gbs.putWithRemoteSync(ctx, key, plan, msg) } func (gbs *GitBlobstore) openReaderAtCommit(ctx context.Context, commit git.OID, key string) (io.ReadCloser, error) { @@ -1179,26 +1217,6 @@ func isMissingGitIdentityErr(err error) bool { strings.Contains(msg, "empty ident name") } -func newTempIndex() (dir, indexFile string, cleanup func(), err error) { - // Create a unique temp index file. This is intentionally *not* placed under GIT_DIR: - // - some git dirs may be read-only or otherwise unsuitable for scratch files - // - we don't want to leave temp files inside the repo on crashes - // - // Note: git will also create a sibling lock file (.lock) during index writes. - f, err := os.CreateTemp("", "dolt-gitblobstore-index-") - if err != nil { - return "", "", nil, err - } - indexFile = f.Name() - _ = f.Close() - dir = filepath.Dir(indexFile) - cleanup = func() { - _ = os.Remove(indexFile) - _ = os.Remove(indexFile + ".lock") - } - return dir, indexFile, cleanup, nil -} - // normalizeGitTreePath normalizes and validates a blobstore key for use as a git tree path. // // Rules: diff --git a/go/store/blobstore/git_blobstore_chunked_checkandput_test.go b/go/store/blobstore/git_blobstore_chunked_checkandput_test.go index cc1e18ff9e5..f4f83ba1abf 100644 --- a/go/store/blobstore/git_blobstore_chunked_checkandput_test.go +++ b/go/store/blobstore/git_blobstore_chunked_checkandput_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" + git "github.com/dolthub/dolt/go/store/blobstore/internal/git" "github.com/dolthub/dolt/go/store/testutils/gitrepo" ) @@ -30,10 +31,19 @@ func TestGitBlobstore_CheckAndPut_ChunkedRoundTrip_CreateOnly(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + _, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) require.NoError(t, err) - bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ Identity: testIdentity(), MaxPartSize: 3, }) @@ -60,16 +70,25 @@ func TestGitBlobstore_CheckAndPut_MismatchDoesNotConsumeReader_WithChunkingEnabl requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + _, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) require.NoError(t, err) // Seed any commit so actualVersion != "". - bs0, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{Identity: testIdentity()}) + bs0, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{Identity: testIdentity()}) require.NoError(t, err) _, err = bs0.Put(ctx, "x", 1, bytes.NewReader([]byte("x"))) require.NoError(t, err) - bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ Identity: testIdentity(), MaxPartSize: 3, }) diff --git a/go/store/blobstore/git_blobstore_chunked_get_test.go b/go/store/blobstore/git_blobstore_chunked_get_test.go index 2f6054f632a..1ef49f54265 100644 --- a/go/store/blobstore/git_blobstore_chunked_get_test.go +++ b/go/store/blobstore/git_blobstore_chunked_get_test.go @@ -31,24 +31,31 @@ func TestGitBlobstore_Get_ChunkedTree_AllAndRanges(t *testing.T) { } ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") require.NoError(t, err) part1 := []byte("abc") part2 := []byte("defgh") - commitOID, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + commitOID, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ "chunked/0001": part1, "chunked/0002": part2, }, "seed chunked tree") require.NoError(t, err) - runner, err := git.NewRunner(repo.GitDir) + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) - api := git.NewGitAPIImpl(runner) + api := git.NewGitAPIImpl(remoteRunner) treeOID, _, err := api.ResolvePathObject(ctx, git.OID(commitOID), "chunked") require.NoError(t, err) - bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + require.NoError(t, err) + + bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef) require.NoError(t, err) wantAll := append(append([]byte(nil), part1...), part2...) @@ -82,17 +89,24 @@ func TestGitBlobstore_Get_ChunkedTree_InvalidPartsError(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") require.NoError(t, err) // Gap: 0001, 0003 - _, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + _, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ "chunked/0001": []byte("a"), "chunked/0003": []byte("b"), }, "seed invalid chunked tree") require.NoError(t, err) - bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + require.NoError(t, err) + + bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef) require.NoError(t, err) _, _, err = GetBytes(ctx, bs, "chunked", AllRange) diff --git a/go/store/blobstore/git_blobstore_chunked_put_test.go b/go/store/blobstore/git_blobstore_chunked_put_test.go index 3096310bc62..6c43adc7853 100644 --- a/go/store/blobstore/git_blobstore_chunked_put_test.go +++ b/go/store/blobstore/git_blobstore_chunked_put_test.go @@ -29,10 +29,19 @@ func TestGitBlobstore_Put_ChunkedWritesTreeParts(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + _, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) require.NoError(t, err) - bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ Identity: testIdentity(), MaxPartSize: 3, }) @@ -47,7 +56,7 @@ func TestGitBlobstore_Put_ChunkedWritesTreeParts(t *testing.T) { require.Equal(t, ver, ver2) require.Equal(t, want, got) - runner, err := git.NewRunner(repo.GitDir) + runner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) api := git.NewGitAPIImpl(runner) @@ -62,24 +71,33 @@ func TestGitBlobstore_Put_ChunkedWritesTreeParts(t *testing.T) { entries, err := api.ListTree(ctx, head, "big") require.NoError(t, err) require.Len(t, entries, 4) - require.Equal(t, "00000001", entries[0].Name) - require.Equal(t, "00000004", entries[3].Name) + require.Equal(t, "0001", entries[0].Name) + require.Equal(t, "0004", entries[3].Name) } func TestGitBlobstore_Put_IdempotentDoesNotChangeExistingRepresentation(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + _, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) require.NoError(t, err) - bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ Identity: testIdentity(), MaxPartSize: 3, }) require.NoError(t, err) - runner, err := git.NewRunner(repo.GitDir) + runner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) api := git.NewGitAPIImpl(runner) diff --git a/go/store/blobstore/git_blobstore_helpers_test.go b/go/store/blobstore/git_blobstore_helpers_test.go index 8080f4e8382..35c96f0cd5a 100644 --- a/go/store/blobstore/git_blobstore_helpers_test.go +++ b/go/store/blobstore/git_blobstore_helpers_test.go @@ -33,6 +33,8 @@ type fakeGitAPI struct { listTree func(ctx context.Context, commit git.OID, treePath string) ([]git.TreeEntry, error) blobSize func(ctx context.Context, oid git.OID) (int64, error) blobReader func(ctx context.Context, oid git.OID) (io.ReadCloser, error) + fetchRef func(ctx context.Context, remote string, srcRef string, dstRef string) error + pushRefWithLease func(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID git.OID) error } func (f fakeGitAPI) TryResolveRefCommit(ctx context.Context, ref string) (git.OID, bool, error) { @@ -86,6 +88,18 @@ func (f fakeGitAPI) UpdateRefCAS(ctx context.Context, ref string, newOID git.OID func (f fakeGitAPI) UpdateRef(ctx context.Context, ref string, newOID git.OID, msg string) error { panic("unexpected call") } +func (f fakeGitAPI) FetchRef(ctx context.Context, remote string, srcRef string, dstRef string) error { + if f.fetchRef == nil { + panic("unexpected call") + } + return f.fetchRef(ctx, remote, srcRef, dstRef) +} +func (f fakeGitAPI) PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID git.OID) error { + if f.pushRefWithLease == nil { + panic("unexpected call") + } + return f.pushRefWithLease(ctx, remote, srcRef, dstRef, expectedDstOID) +} func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) { ctx := context.Background() @@ -97,7 +111,7 @@ func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) { return git.OID("0123456789abcdef0123456789abcdef01234567"), true, nil }, } - gbs := &GitBlobstore{ref: DoltDataRef, api: api} + gbs := &GitBlobstore{localRef: DoltDataRef, api: api} commit, err := gbs.resolveCommitForGet(ctx, "k") require.NoError(t, err) @@ -110,7 +124,7 @@ func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) { return git.OID(""), false, nil }, } - gbs := &GitBlobstore{ref: DoltDataRef, api: api} + gbs := &GitBlobstore{localRef: DoltDataRef, api: api} _, err := gbs.resolveCommitForGet(ctx, "manifest") var nf NotFound @@ -124,7 +138,7 @@ func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) { return git.OID(""), false, nil }, } - gbs := &GitBlobstore{ref: DoltDataRef, api: api} + gbs := &GitBlobstore{localRef: DoltDataRef, api: api} _, err := gbs.resolveCommitForGet(ctx, "somekey") var rnf *git.RefNotFoundError @@ -139,7 +153,7 @@ func TestGitBlobstoreHelpers_resolveCommitForGet(t *testing.T) { return git.OID(""), false, sentinel }, } - gbs := &GitBlobstore{ref: DoltDataRef, api: api} + gbs := &GitBlobstore{localRef: DoltDataRef, api: api} _, err := gbs.resolveCommitForGet(ctx, "k") require.ErrorIs(t, err, sentinel) diff --git a/go/store/blobstore/git_blobstore_test.go b/go/store/blobstore/git_blobstore_test.go index 18b7156ec55..deaf1325561 100644 --- a/go/store/blobstore/git_blobstore_test.go +++ b/go/store/blobstore/git_blobstore_test.go @@ -22,6 +22,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "sync/atomic" "testing" @@ -42,14 +43,30 @@ func testIdentity() *git.Identity { return &git.Identity{Name: "gitblobstore test", Email: "gitblobstore@test.invalid"} } -func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) { +func newRemoteAndLocalRepos(t *testing.T, ctx context.Context) (remoteRepo *gitrepo.Repo, localRepo *gitrepo.Repo, localRunner *git.Runner) { + t.Helper() + + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + + localRepo, err = gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err = git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + require.NoError(t, err) + return remoteRepo, localRepo, localRunner +} + +func TestGitBlobstore_MissingKeysAreNotFound(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") require.NoError(t, err) - bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) + bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef) require.NoError(t, err) ok, err := bs.Exists(ctx, "manifest") @@ -60,35 +77,31 @@ func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) { require.Error(t, err) require.True(t, IsNotFoundError(err)) - // For non-manifest keys, missing the ref is a hard error. _, _, _, err = bs.Get(ctx, "table", AllRange) require.Error(t, err) - require.False(t, IsNotFoundError(err)) - var rnf *git.RefNotFoundError - require.True(t, errors.As(err, &rnf)) + require.True(t, IsNotFoundError(err)) } func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") - require.NoError(t, err) + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) want := []byte("hello manifest\n") - commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + commit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ "manifest": want, "dir/file": []byte("abc"), }, "seed") require.NoError(t, err) - bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) + bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef) require.NoError(t, err) - runner, err := git.NewRunner(repo.GitDir) + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) - api := git.NewGitAPIImpl(runner) - manifestOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "manifest") + remoteAPI := git.NewGitAPIImpl(remoteRunner) + manifestOID, _, err := remoteAPI.ResolvePathObject(ctx, git.OID(commit), "manifest") require.NoError(t, err) ok, err := bs.Exists(ctx, "manifest") @@ -117,19 +130,421 @@ func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) { _ = rc.Close() } -func TestGitBlobstore_Get_NotFoundMissingKey(t *testing.T) { +func TestGitBlobstore_RemoteManaged_ExistsFetchesAndAligns(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + remoteCommit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + "manifest": []byte("from remote\n"), + }, "seed remote") + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ + RemoteName: "origin", + }) + require.NoError(t, err) + + ok, err := bs.Exists(ctx, "manifest") + require.NoError(t, err) + require.True(t, ok) + + require.Equal(t, DoltDataRef, bs.remoteRef) + require.Equal(t, RemoteTrackingRef("origin", DoltDataRef), bs.remoteTrackingRef) + require.NotEqual(t, bs.remoteRef, bs.localRef) + require.NotEqual(t, bs.remoteTrackingRef, bs.localRef) + require.True(t, strings.HasPrefix(bs.localRef, "refs/dolt/blobstore/origin/dolt/data/")) + + localAPI := git.NewGitAPIImpl(localRunner) + gotLocal, err := localAPI.ResolveRefCommit(ctx, bs.localRef) + require.NoError(t, err) + require.Equal(t, git.OID(remoteCommit), gotLocal) + + gotTracking, err := localAPI.ResolveRefCommit(ctx, bs.remoteTrackingRef) + require.NoError(t, err) + require.Equal(t, git.OID(remoteCommit), gotTracking) +} + +func TestGitBlobstore_RemoteAndLocalRefNaming_ConfigurableRemoteRef(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + + const remoteRef = "refs/heads/alt" + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, remoteRef, GitBlobstoreOptions{ + RemoteName: "origin", + }) + require.NoError(t, err) + + require.Equal(t, remoteRef, bs.remoteRef) + require.Equal(t, RemoteTrackingRef("origin", remoteRef), bs.remoteTrackingRef) + require.NotEmpty(t, bs.localRef) + require.NotEqual(t, bs.remoteRef, bs.localRef) + require.NotEqual(t, bs.remoteTrackingRef, bs.localRef) + require.True(t, strings.HasPrefix(bs.localRef, "refs/dolt/blobstore/origin/heads/alt/")) +} + +func TestGitBlobstore_CleanupOwnedLocalRef_DeletesRef(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ + RemoteName: "origin", + Identity: testIdentity(), + }) require.NoError(t, err) - _, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + _, err = localRepo.SetRefToTree(ctx, bs.localRef, map[string][]byte{ + "manifest": []byte("x"), + }, "seed localRef") + require.NoError(t, err) + + runner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + api := git.NewGitAPIImpl(runner) + + _, err = api.ResolveRefCommit(ctx, bs.localRef) + require.NoError(t, err) + + require.NoError(t, bs.CleanupOwnedLocalRef(ctx)) + + _, err = api.ResolveRefCommit(ctx, bs.localRef) + var rnf *git.RefNotFoundError + require.ErrorAs(t, err, &rnf) + require.Equal(t, bs.localRef, rnf.Ref) +} + +func TestGitBlobstore_RemoteManaged_PutPushesToRemote(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + _, err = remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + "base": []byte("base\n"), + }, "seed remote") + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ + RemoteName: "origin", + Identity: testIdentity(), + }) + require.NoError(t, err) + + ver, err := PutBytes(ctx, bs, "k", []byte("from local\n")) + require.NoError(t, err) + require.NotEmpty(t, ver) + + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) + require.NoError(t, err) + remoteAPI := git.NewGitAPIImpl(remoteRunner) + + remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + oid, typ, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "k") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeBlob, typ) + + rc, err := remoteAPI.BlobReader(ctx, oid) + require.NoError(t, err) + got, err := io.ReadAll(rc) + require.NoError(t, err) + require.NoError(t, rc.Close()) + require.Equal(t, []byte("from local\n"), got) +} + +type hookPushGitAPI struct { + git.GitAPI + onFirstPush func(ctx context.Context) + did atomic.Bool +} + +func (h *hookPushGitAPI) PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID git.OID) error { + if h.onFirstPush != nil && !h.did.Swap(true) { + h.onFirstPush(ctx) + } + return h.GitAPI.PushRefWithLease(ctx, remote, srcRef, dstRef, expectedDstOID) +} + +func TestGitBlobstore_RemoteManaged_PutRetriesOnLeaseFailure(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) + require.NoError(t, err) + remoteAPI := git.NewGitAPIImpl(remoteRunner) + + // Seed remote so it has a head for the lease. + _, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "base", []byte("base\n"), testIdentity()) + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ + RemoteName: "origin", + Identity: testIdentity(), + }) + require.NoError(t, err) + + var externalHead atomic.Value // git.OID + + // Advance the remote right before the first push to force a lease failure and trigger a retry. + bs.api = &hookPushGitAPI{ + GitAPI: bs.api, + onFirstPush: func(ctx context.Context) { + oid, _ := writeKeyToRef(ctx, remoteAPI, DoltDataRef, "external", []byte("external\n"), testIdentity()) + if oid != "" { + externalHead.Store(oid) + } + }, + } + + ver, err := PutBytes(ctx, bs, "k", []byte("after retry\n")) + require.NoError(t, err) + require.NotEmpty(t, ver) + + remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + + // Verify we rebuilt on top of the advanced remote head (i.e. parent is externalHead). + if v := externalHead.Load(); v != nil { + wantParent := v.(git.OID) + out, err := remoteRunner.Run(ctx, git.RunOptions{}, "rev-parse", remoteHead.String()+"^") + require.NoError(t, err) + require.Equal(t, wantParent.String(), string(bytes.TrimSpace(out))) + _, err = remoteRunner.Run(ctx, git.RunOptions{}, "rev-parse", remoteHead.String()+"^2") + require.Error(t, err) // not a merge commit + } + + oid, typ, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "k") + require.NoError(t, err) + require.Equal(t, git.ObjectTypeBlob, typ) + + rc, err := remoteAPI.BlobReader(ctx, oid) + require.NoError(t, err) + got, err := io.ReadAll(rc) + require.NoError(t, err) + require.NoError(t, rc.Close()) + require.Equal(t, []byte("after retry\n"), got) +} + +func TestGitBlobstore_RemoteManaged_CheckAndPut_RemoteHeadTruth(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) + require.NoError(t, err) + remoteAPI := git.NewGitAPIImpl(remoteRunner) + + // Base manifest + base, err := writeKeyToRef(ctx, remoteAPI, DoltDataRef, "manifest", []byte("base\n"), testIdentity()) + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + require.NoError(t, err) + localAPI := git.NewGitAPIImpl(localRunner) + + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ + RemoteName: "origin", + Identity: testIdentity(), + }) + require.NoError(t, err) + + // Fetch remote so local has the base object, then create a conflicting local commit. + require.NoError(t, localAPI.FetchRef(ctx, "origin", DoltDataRef, bs.remoteTrackingRef)) + require.NoError(t, localAPI.UpdateRef(ctx, bs.localRef, base, "set local to base")) + _, err = writeKeyToRef(ctx, localAPI, bs.localRef, "manifest", []byte("local\n"), testIdentity()) + require.NoError(t, err) + + // Advance remote independently so we have a conflict on "manifest". + _, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "manifest", []byte("remote\n"), testIdentity()) + require.NoError(t, err) + + remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + remoteManifestOID, _, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "manifest") + require.NoError(t, err) + + // Remote is truth: CheckAndPut validates against remoteHead and applies changes on top of it. + newBytes := []byte("replayed\n") + ver, err := bs.CheckAndPut(ctx, remoteManifestOID.String(), "manifest", int64(len(newBytes)), bytes.NewReader(newBytes)) + require.NoError(t, err) + require.NotEmpty(t, ver) + + remoteHead, err = remoteAPI.ResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + oid, _, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "manifest") + require.NoError(t, err) + rc, err := remoteAPI.BlobReader(ctx, oid) + require.NoError(t, err) + got, err := io.ReadAll(rc) + require.NoError(t, err) + require.NoError(t, rc.Close()) + require.Equal(t, newBytes, got) +} + +func TestGitBlobstore_RemoteManaged_CheckAndPut_ExpectedMatchesLocalButNotRemoteFails(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) + require.NoError(t, err) + remoteAPI := git.NewGitAPIImpl(remoteRunner) + + // Base manifest + base, err := writeKeyToRef(ctx, remoteAPI, DoltDataRef, "manifest", []byte("base\n"), testIdentity()) + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + require.NoError(t, err) + localAPI := git.NewGitAPIImpl(localRunner) + + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ + RemoteName: "origin", + Identity: testIdentity(), + }) + require.NoError(t, err) + + // Create a local-only manifest version. + require.NoError(t, localAPI.FetchRef(ctx, "origin", DoltDataRef, bs.remoteTrackingRef)) + require.NoError(t, localAPI.UpdateRef(ctx, bs.localRef, base, "set local to base")) + _, err = writeKeyToRef(ctx, localAPI, bs.localRef, "manifest", []byte("local\n"), testIdentity()) + require.NoError(t, err) + localHead, err := localAPI.ResolveRefCommit(ctx, bs.localRef) + require.NoError(t, err) + localManifestOID, _, err := localAPI.ResolvePathObject(ctx, localHead, "manifest") + require.NoError(t, err) + + // Advance remote independently. + _, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "manifest", []byte("remote\n"), testIdentity()) + require.NoError(t, err) + remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + remoteManifestOID, _, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "manifest") + require.NoError(t, err) + + // Expected version matches local, but remote is truth, so this should fail. + _, err = bs.CheckAndPut(ctx, localManifestOID.String(), "manifest", int64(len("new\n")), bytes.NewReader([]byte("new\n"))) + var capErr CheckAndPutError + require.ErrorAs(t, err, &capErr) + require.Equal(t, "manifest", capErr.Key) + require.Equal(t, localManifestOID.String(), capErr.ExpectedVersion) + require.Equal(t, remoteManifestOID.String(), capErr.ActualVersion) +} + +func TestGitBlobstore_RemoteManaged_PutOverwritesDivergedLocalRef_NoMergeCommit(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") + require.NoError(t, err) + remoteRunner, err := git.NewRunner(remoteRepo.GitDir) + require.NoError(t, err) + remoteAPI := git.NewGitAPIImpl(remoteRunner) + + // Seed + advance remote. + _, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "base", []byte("base\n"), testIdentity()) + require.NoError(t, err) + _, err = writeKeyToRef(ctx, remoteAPI, DoltDataRef, "remote", []byte("remote\n"), testIdentity()) + require.NoError(t, err) + remoteHeadBefore, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + localRunner, err := git.NewRunner(localRepo.GitDir) + require.NoError(t, err) + _, err = localRunner.Run(ctx, git.RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + require.NoError(t, err) + localAPI := git.NewGitAPIImpl(localRunner) + + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ + RemoteName: "origin", + Identity: testIdentity(), + }) + require.NoError(t, err) + + // Make local diverge from remote. + require.NoError(t, localAPI.FetchRef(ctx, "origin", DoltDataRef, bs.remoteTrackingRef)) + require.NoError(t, localAPI.UpdateRef(ctx, bs.localRef, remoteHeadBefore, "set local to remote head")) + _, err = writeKeyToRef(ctx, localAPI, bs.localRef, "local", []byte("local\n"), testIdentity()) + require.NoError(t, err) + + _, err = PutBytes(ctx, bs, "k", []byte("from local\n")) + require.NoError(t, err) + + remoteHeadAfter, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef) + require.NoError(t, err) + + // New remote head is a normal (non-merge) commit built on remoteHeadBefore. + out, err := remoteRunner.Run(ctx, git.RunOptions{}, "rev-parse", remoteHeadAfter.String()+"^") + require.NoError(t, err) + require.Equal(t, remoteHeadBefore.String(), string(bytes.TrimSpace(out))) + _, err = remoteRunner.Run(ctx, git.RunOptions{}, "rev-parse", remoteHeadAfter.String()+"^2") + require.Error(t, err) + + // Local-only divergence should not be present on remote. + _, _, err = remoteAPI.ResolvePathObject(ctx, remoteHeadAfter, "local") + require.Error(t, err) +} + +func TestGitBlobstore_Get_NotFoundMissingKey(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ "present": []byte("x"), }, "seed") require.NoError(t, err) - bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) + bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef) require.NoError(t, err) _, _, err = GetBytes(ctx, bs, "missing", AllRange) @@ -141,21 +556,20 @@ func TestGitBlobstore_BlobRangeSemantics(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") - require.NoError(t, err) + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) maxValue := int64(16 * 1024) testData := rangeData(0, maxValue) - commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + commit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ "range": testData, }, "range fixture") require.NoError(t, err) - bs, err := NewGitBlobstore(repo.GitDir, DoltDataRef) + bs, err := NewGitBlobstore(localRepo.GitDir, DoltDataRef) require.NoError(t, err) - runner, err := git.NewRunner(repo.GitDir) + runner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) api := git.NewGitAPIImpl(runner) rangeOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "range") @@ -234,10 +648,11 @@ func TestGitBlobstore_Put_RoundTripAndVersion(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") require.NoError(t, err) - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) want := []byte("hello put\n") @@ -259,10 +674,11 @@ func TestGitBlobstore_Concatenate_Basic(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") require.NoError(t, err) - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) _, err = PutBytes(ctx, bs, "a", []byte("hi ")) @@ -284,10 +700,11 @@ func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") require.NoError(t, err) - bs, err := NewGitBlobstoreWithOptions(repo.GitDir, DoltDataRef, GitBlobstoreOptions{ + bs, err := NewGitBlobstoreWithOptions(localRepo.GitDir, DoltDataRef, GitBlobstoreOptions{ Identity: testIdentity(), MaxPartSize: 1024, }) @@ -307,7 +724,7 @@ func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) { require.NotEmpty(t, ver) // Verify the resulting key is stored as a chunked tree (not a single blob). - head, ok, err := bs.api.TryResolveRefCommit(ctx, DoltDataRef) + head, ok, err := bs.api.TryResolveRefCommit(ctx, bs.localRef) require.NoError(t, err) require.True(t, ok) oid, typ, err := bs.api.ResolvePathObject(ctx, head, "c") @@ -318,7 +735,7 @@ func TestGitBlobstore_Concatenate_ChunkedResult(t *testing.T) { parts, err := bs.api.ListTree(ctx, head, "c") require.NoError(t, err) require.GreaterOrEqual(t, len(parts), 2) - require.Equal(t, "00000001", parts[0].Name) + require.Equal(t, "0001", parts[0].Name) got, ver2, err := GetBytes(ctx, bs, "c", AllRange) require.NoError(t, err) @@ -330,10 +747,11 @@ func TestGitBlobstore_Concatenate_KeyExistsFastSucceeds(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") require.NoError(t, err) - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) ver1, err := PutBytes(ctx, bs, "c", []byte("original")) @@ -359,10 +777,13 @@ func TestGitBlobstore_Concatenate_MissingSourceIsNotFound(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + "present": []byte("x"), + }, "seed") require.NoError(t, err) - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) _, err = PutBytes(ctx, bs, "present", []byte("x")) @@ -380,10 +801,11 @@ func TestGitBlobstore_Concatenate_EmptySourcesErrors(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") require.NoError(t, err) - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) _, err = bs.Concatenate(ctx, "c", nil) @@ -400,10 +822,11 @@ func TestGitBlobstore_Put_IdempotentIfKeyExists(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") require.NoError(t, err) - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) ver1, err := PutBytes(ctx, bs, "k", []byte("v1\n")) @@ -420,22 +843,6 @@ func TestGitBlobstore_Put_IdempotentIfKeyExists(t *testing.T) { require.Equal(t, []byte("v1\n"), got) } -type hookGitAPI struct { - git.GitAPI - - ref string - // if set, called once before the first UpdateRefCAS executes. - onFirstCAS func(ctx context.Context, old git.OID) - did atomic.Bool -} - -func (h *hookGitAPI) UpdateRefCAS(ctx context.Context, ref string, newOID git.OID, oldOID git.OID, msg string) error { - if h.onFirstCAS != nil && !h.did.Swap(true) && ref == h.ref { - h.onFirstCAS(ctx, oldOID) - } - return h.GitAPI.UpdateRefCAS(ctx, ref, newOID, oldOID, msg) -} - func writeKeyToRef(ctx context.Context, api git.GitAPI, ref string, key string, data []byte, author *git.Identity) (git.OID, error) { parent, ok, err := api.TryResolveRefCommit(ctx, ref) if err != nil { @@ -490,54 +897,6 @@ func writeKeyToRef(ctx context.Context, api git.GitAPI, ref string, key string, return commitOID, nil } -func TestGitBlobstore_Put_ContentionRetryPreservesOtherKey(t *testing.T) { - requireGitOnPath(t) - - ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") - require.NoError(t, err) - - // Seed the ref so Put takes the CAS path. - _, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ - "base": []byte("base\n"), - }, "seed") - require.NoError(t, err) - - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) - require.NoError(t, err) - - origAPI := bs.api - h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef} - h.onFirstCAS = func(ctx context.Context, old git.OID) { - // Advance the ref to simulate another writer committing concurrently. - _, _ = writeKeyToRef(ctx, origAPI, DoltDataRef, "external", []byte("external\n"), testIdentity()) - } - bs.api = h - - ver, err := PutBytes(ctx, bs, "k", []byte("mine\n")) - require.NoError(t, err) - require.NotEmpty(t, ver) - - got, ver2, err := GetBytes(ctx, bs, "k", AllRange) - require.NoError(t, err) - require.Equal(t, ver, ver2) - require.Equal(t, []byte("mine\n"), got) - - got, _, err = GetBytes(ctx, bs, "external", AllRange) - require.NoError(t, err) - require.Equal(t, []byte("external\n"), got) - - got, _, err = GetBytes(ctx, bs, "base", AllRange) - require.NoError(t, err) - require.Equal(t, []byte("base\n"), got) - - // Sanity: BlobReader path still works for the new commit. - rc, _, _, err := bs.Get(ctx, "k", AllRange) - require.NoError(t, err) - _, _ = io.ReadAll(rc) - _ = rc.Close() -} - type failReader struct { called atomic.Bool } @@ -551,10 +910,11 @@ func TestGitBlobstore_CheckAndPut_CreateOnly(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) + _, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, nil, "seed empty") require.NoError(t, err) - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) want := []byte("created\n") @@ -572,18 +932,17 @@ func TestGitBlobstore_CheckAndPut_MismatchDoesNotRead(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") - require.NoError(t, err) + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) - commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + commit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ "k": []byte("base\n"), }, "seed") require.NoError(t, err) - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) - runner, err := git.NewRunner(repo.GitDir) + runner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) api := git.NewGitAPIImpl(runner) keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k") @@ -600,19 +959,18 @@ func TestGitBlobstore_CheckAndPut_UpdateSuccess(t *testing.T) { requireGitOnPath(t) ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") - require.NoError(t, err) + remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx) - commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + commit, err := remoteRepo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ "k": []byte("base\n"), "keep": []byte("keep\n"), }, "seed") require.NoError(t, err) - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity()) require.NoError(t, err) - runner, err := git.NewRunner(repo.GitDir) + runner, err := git.NewRunner(remoteRepo.GitDir) require.NoError(t, err) api := git.NewGitAPIImpl(runner) keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k") @@ -633,47 +991,3 @@ func TestGitBlobstore_CheckAndPut_UpdateSuccess(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte("keep\n"), got) } - -func TestGitBlobstore_CheckAndPut_ConcurrentUnrelatedUpdateStillSucceeds(t *testing.T) { - requireGitOnPath(t) - - ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") - require.NoError(t, err) - - commit, err := repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ - "k": []byte("base\n"), - }, "seed") - require.NoError(t, err) - - bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) - require.NoError(t, err) - - runner, err := git.NewRunner(repo.GitDir) - require.NoError(t, err) - api := git.NewGitAPIImpl(runner) - keyOID, _, err := api.ResolvePathObject(ctx, git.OID(commit), "k") - require.NoError(t, err) - - origAPI := bs.api - h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef} - h.onFirstCAS = func(ctx context.Context, old git.OID) { - // Advance the ref (without touching "k") to make UpdateRefCAS fail. - _, _ = writeKeyToRef(ctx, origAPI, DoltDataRef, "external", []byte("external\n"), testIdentity()) - } - bs.api = h - - ver2, err := bs.CheckAndPut(ctx, keyOID.String(), "k", 0, bytes.NewReader([]byte("mine\n"))) - require.NoError(t, err) - require.NotEmpty(t, ver2) - require.NotEqual(t, keyOID.String(), ver2) - - got, ver3, err := GetBytes(ctx, bs, "k", AllRange) - require.NoError(t, err) - require.Equal(t, ver2, ver3) - require.Equal(t, []byte("mine\n"), got) - - got, _, err = GetBytes(ctx, bs, "external", AllRange) - require.NoError(t, err) - require.Equal(t, []byte("external\n"), got) -} diff --git a/go/store/blobstore/git_refs.go b/go/store/blobstore/git_refs.go index a2d2e33e1c9..fcef9d6b7ed 100644 --- a/go/store/blobstore/git_refs.go +++ b/go/store/blobstore/git_refs.go @@ -14,14 +14,29 @@ package blobstore -import "fmt" +import ( + "fmt" + "strings" +) -// DoltDataRef is the local writable ref backing a git-object-db dolt blobstore. -// It is the state that local operations mutate and (eventually) attempt to push. +// DoltDataRef is the default remote ref backing a git-object-db dolt blobstore. const DoltDataRef = "refs/dolt/data" -// DoltRemoteTrackingDataRef returns the remote-tracking ref for a named remote. -// This ref represents the remote's DoltDataRef as of the last fetch. -func DoltRemoteTrackingDataRef(remote string) string { - return fmt.Sprintf("refs/dolt/remotes/%s/data", remote) +func trimRefsPrefix(ref string) string { + return strings.TrimPrefix(ref, "refs/") +} + +// RemoteTrackingRef returns the remote-tracking ref for a named remote and remote ref. +// This ref represents the remote's |remoteRef| as of the last fetch. +func RemoteTrackingRef(remoteName, remoteRef string) string { + return fmt.Sprintf("refs/dolt/remotes/%s/%s", remoteName, trimRefsPrefix(remoteRef)) +} + +// OwnedLocalRef returns a UUID-owned local ref for a GitBlobstore instance. +// +// Note: these UUID refs can accumulate in the local repo over time. This is +// intentional for now; callers that want best-effort cleanup can use +// (*GitBlobstore).CleanupOwnedLocalRef. +func OwnedLocalRef(remoteName, remoteRef, uuid string) string { + return fmt.Sprintf("refs/dolt/blobstore/%s/%s/%s", remoteName, trimRefsPrefix(remoteRef), uuid) } diff --git a/go/store/blobstore/internal/git/api.go b/go/store/blobstore/internal/git/api.go index cabda14d608..9410875dcbe 100644 --- a/go/store/blobstore/internal/git/api.go +++ b/go/store/blobstore/internal/git/api.go @@ -108,6 +108,17 @@ type GitAPI interface { // Equivalent plumbing: // GIT_DIR=... git update-ref -m UpdateRef(ctx context.Context, ref string, newOID OID, msg string) error + + // FetchRef fetches |srcRef| from |remote| and updates |dstRef| in the local repo. + // It is expected to force-update (tracking refs follow remote truth). + // Equivalent plumbing: + // GIT_DIR=... git fetch +: + FetchRef(ctx context.Context, remote string, srcRef string, dstRef string) error + + // PushRefWithLease pushes |srcRef| to |dstRef| on |remote|, but only if the remote's |dstRef| + // equals |expectedDstOID| (force-with-lease). + // Equivalent plumbing: GIT_DIR=... git push --force-with-lease=: : + PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID OID) error } // TreeEntry describes one entry in a git tree listing. diff --git a/go/store/blobstore/internal/git/impl.go b/go/store/blobstore/internal/git/impl.go index 8e78ccfaf64..b9984f883ff 100644 --- a/go/store/blobstore/internal/git/impl.go +++ b/go/store/blobstore/internal/git/impl.go @@ -282,6 +282,43 @@ func (a *GitAPIImpl) UpdateRef(ctx context.Context, ref string, newOID OID, msg return err } +func (a *GitAPIImpl) FetchRef(ctx context.Context, remote string, srcRef string, dstRef string) error { + if remote == "" { + return fmt.Errorf("git fetch: remote is required") + } + if srcRef == "" { + return fmt.Errorf("git fetch: src ref is required") + } + if dstRef == "" { + return fmt.Errorf("git fetch: dst ref is required") + } + // Forced refspec to keep tracking refs in sync with remote truth. + srcRef = strings.TrimPrefix(srcRef, "+") + refspec := "+" + srcRef + ":" + dstRef + _, err := a.r.Run(ctx, RunOptions{}, "fetch", "--no-tags", remote, refspec) + return err +} + +func (a *GitAPIImpl) PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID OID) error { + if remote == "" { + return fmt.Errorf("git push: remote is required") + } + if srcRef == "" { + return fmt.Errorf("git push: src ref is required") + } + if dstRef == "" { + return fmt.Errorf("git push: dst ref is required") + } + if expectedDstOID == "" { + return fmt.Errorf("git push: expected dst oid is required") + } + srcRef = strings.TrimPrefix(srcRef, "+") + refspec := srcRef + ":" + dstRef + lease := "--force-with-lease=" + dstRef + ":" + expectedDstOID.String() + _, err := a.r.Run(ctx, RunOptions{}, "push", "--porcelain", lease, remote, refspec) + return err +} + func isRefNotFoundErr(err error) bool { ce, ok := err.(*CmdError) if !ok { diff --git a/go/store/blobstore/internal/git/impl_test.go b/go/store/blobstore/internal/git/impl_test.go index 216cb880c79..4ca4ff67c7f 100644 --- a/go/store/blobstore/internal/git/impl_test.go +++ b/go/store/blobstore/internal/git/impl_test.go @@ -35,20 +35,26 @@ func tempIndexFile(t *testing.T) string { return filepath.Join(dir, "index") } -func TestGitAPIImpl_HashObject_RoundTrip(t *testing.T) { - t.Parallel() +func newTestRepo(t *testing.T, ctx context.Context) (*gitrepo.Repo, *Runner, GitAPI) { + t.Helper() - ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") + repoDir := filepath.Join(t.TempDir(), "repo.git") + repo, err := gitrepo.InitBare(ctx, repoDir) if err != nil { t.Fatal(err) } - r, err := NewRunner(repo.GitDir) if err != nil { t.Fatal(err) } - api := NewGitAPIImpl(r) + return repo, r, NewGitAPIImpl(r) +} + +func TestGitAPIImpl_HashObject_RoundTrip(t *testing.T) { + t.Parallel() + + ctx := context.Background() + _, _, api := newTestRepo(t, ctx) want := []byte("hello dolt\n") oid, err := api.HashObject(ctx, bytes.NewReader(want)) @@ -86,16 +92,7 @@ func TestGitAPIImpl_HashObject_Empty(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) oid, err := api.HashObject(ctx, bytes.NewReader(nil)) if err != nil { @@ -118,18 +115,9 @@ func TestGitAPIImpl_ResolveRefCommit_Missing(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) - _, err = api.ResolveRefCommit(ctx, "refs/does/not/exist") + _, err := api.ResolveRefCommit(ctx, "refs/does/not/exist") if err == nil { t.Fatalf("expected error") } @@ -143,16 +131,7 @@ func TestGitAPIImpl_ResolveRefCommit_Exists(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) indexFile := tempIndexFile(t) if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { @@ -184,16 +163,7 @@ func TestGitAPIImpl_WriteTree_FromEmptyIndex(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) indexFile := tempIndexFile(t) if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { @@ -242,16 +212,7 @@ func TestGitAPIImpl_UpdateIndexCacheInfo_ReplacesExistingEntry(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) indexFile := tempIndexFile(t) if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { @@ -308,16 +269,7 @@ func TestGitAPIImpl_UpdateIndexCacheInfo_FileDirectoryConflictErrors(t *testing. t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) indexFile := tempIndexFile(t) if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { @@ -370,16 +322,7 @@ func TestGitAPIImpl_ResolvePathObject_BlobAndTree(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) indexFile := tempIndexFile(t) if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { @@ -426,16 +369,7 @@ func TestGitAPIImpl_ListTree_NonRecursive(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) indexFile := tempIndexFile(t) if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { @@ -513,16 +447,7 @@ func TestGitAPIImpl_RemoveIndexPaths_RemovesFromIndex(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) indexFile := tempIndexFile(t) if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { @@ -580,16 +505,7 @@ func TestGitAPIImpl_ReadTree_PreservesExistingPaths(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, r, api := newTestRepo(t, ctx) // Base commit with one file. baseIndex := tempIndexFile(t) @@ -692,16 +608,7 @@ func TestGitAPIImpl_UpdateRef_And_CAS(t *testing.T) { t.Parallel() ctx := context.Background() - repo, err := gitrepo.InitBareTemp(ctx, "") - if err != nil { - t.Fatal(err) - } - - r, err := NewRunner(repo.GitDir) - if err != nil { - t.Fatal(err) - } - api := NewGitAPIImpl(r) + _, _, api := newTestRepo(t, ctx) // Create two commits on the same tree. indexFile := tempIndexFile(t) @@ -773,3 +680,165 @@ func TestGitAPIImpl_UpdateRef_And_CAS(t *testing.T) { t.Fatalf("ref changed unexpectedly: ok=%v got=%q want=%q", ok, got, c2) } } + +func TestGitAPIImpl_FetchRef_ForcedUpdatesTrackingRef(t *testing.T) { + t.Parallel() + + ctx := context.Background() + remoteRepo, _, remoteAPI := newTestRepo(t, ctx) + + // Create two commits on the same tree in the remote. + indexFile := tempIndexFile(t) + if err := remoteAPI.ReadTreeEmpty(ctx, indexFile); err != nil { + t.Fatal(err) + } + treeOID, err := remoteAPI.WriteTree(ctx, indexFile) + if err != nil { + t.Fatal(err) + } + c1, err := remoteAPI.CommitTree(ctx, treeOID, nil, "c1", testAuthor()) + if err != nil { + t.Fatal(err) + } + c2, err := remoteAPI.CommitTree(ctx, treeOID, nil, "c2", testAuthor()) + if err != nil { + t.Fatal(err) + } + if c1 == c2 { + c2, err = remoteAPI.CommitTree(ctx, treeOID, nil, "c2b", testAuthor()) + if err != nil { + t.Fatal(err) + } + if c1 == c2 { + t.Fatalf("expected distinct commit oids") + } + } + + remoteDataRef := "refs/dolt/data" + if err := remoteAPI.UpdateRef(ctx, remoteDataRef, c2, "seed remote"); err != nil { + t.Fatal(err) + } + + _, localRunner, localAPI := newTestRepo(t, ctx) + _, err = localRunner.Run(ctx, RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + if err != nil { + t.Fatal(err) + } + + dstRef := "refs/dolt/remotes/origin/data" + if err := localAPI.FetchRef(ctx, "origin", remoteDataRef, dstRef); err != nil { + t.Fatal(err) + } + got, err := localAPI.ResolveRefCommit(ctx, dstRef) + if err != nil { + t.Fatal(err) + } + if got != c2 { + t.Fatalf("tracking ref mismatch: got %q, want %q", got, c2) + } + + // Rewind the remote ref to c1 and ensure a subsequent fetch forces the tracking ref backwards. + if err := remoteAPI.UpdateRef(ctx, remoteDataRef, c1, "rewind remote"); err != nil { + t.Fatal(err) + } + if err := localAPI.FetchRef(ctx, "origin", remoteDataRef, dstRef); err != nil { + t.Fatal(err) + } + got, err = localAPI.ResolveRefCommit(ctx, dstRef) + if err != nil { + t.Fatal(err) + } + if got != c1 { + t.Fatalf("tracking ref mismatch after rewind: got %q, want %q", got, c1) + } +} + +func TestGitAPIImpl_PushRefWithLease_SucceedsThenRejectsStaleLease(t *testing.T) { + t.Parallel() + + ctx := context.Background() + remoteRepo, _, remoteAPI := newTestRepo(t, ctx) + + // Seed remote ref with r1, then later advance to r2. + indexFile := tempIndexFile(t) + if err := remoteAPI.ReadTreeEmpty(ctx, indexFile); err != nil { + t.Fatal(err) + } + treeOID, err := remoteAPI.WriteTree(ctx, indexFile) + if err != nil { + t.Fatal(err) + } + r1, err := remoteAPI.CommitTree(ctx, treeOID, nil, "r1", testAuthor()) + if err != nil { + t.Fatal(err) + } + r2, err := remoteAPI.CommitTree(ctx, treeOID, nil, "r2", testAuthor()) + if err != nil { + t.Fatal(err) + } + if r1 == r2 { + r2, err = remoteAPI.CommitTree(ctx, treeOID, nil, "r2b", testAuthor()) + if err != nil { + t.Fatal(err) + } + if r1 == r2 { + t.Fatalf("expected distinct commit oids") + } + } + + remoteDataRef := "refs/dolt/data" + if err := remoteAPI.UpdateRef(ctx, remoteDataRef, r1, "seed remote"); err != nil { + t.Fatal(err) + } + + _, localRunner, localAPI := newTestRepo(t, ctx) + _, err = localRunner.Run(ctx, RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir) + if err != nil { + t.Fatal(err) + } + + // Create a local commit l1 and set local refs/dolt/data to it (src ref for push). + localIndex := tempIndexFile(t) + if err := localAPI.ReadTreeEmpty(ctx, localIndex); err != nil { + t.Fatal(err) + } + localTree, err := localAPI.WriteTree(ctx, localIndex) + if err != nil { + t.Fatal(err) + } + l1, err := localAPI.CommitTree(ctx, localTree, nil, "l1", testAuthor()) + if err != nil { + t.Fatal(err) + } + if err := localAPI.UpdateRef(ctx, remoteDataRef, l1, "set local src"); err != nil { + t.Fatal(err) + } + + // Lease matches remote (r1) -> push should succeed and overwrite remoteDataRef to l1. + if err := localAPI.PushRefWithLease(ctx, "origin", remoteDataRef, remoteDataRef, r1); err != nil { + t.Fatal(err) + } + got, err := remoteAPI.ResolveRefCommit(ctx, remoteDataRef) + if err != nil { + t.Fatal(err) + } + if got != l1 { + t.Fatalf("remote ref mismatch after push: got %q, want %q", got, l1) + } + + // Advance remote to r2, then attempt a stale-lease push expecting r1 -> should fail and not clobber r2. + if err := remoteAPI.UpdateRef(ctx, remoteDataRef, r2, "advance remote"); err != nil { + t.Fatal(err) + } + err = localAPI.PushRefWithLease(ctx, "origin", remoteDataRef, remoteDataRef, r1) + if err == nil { + t.Fatalf("expected stale lease push to fail") + } + got, err = remoteAPI.ResolveRefCommit(ctx, remoteDataRef) + if err != nil { + t.Fatal(err) + } + if got != r2 { + t.Fatalf("remote ref changed unexpectedly on stale lease: got %q, want %q", got, r2) + } +} diff --git a/go/store/blobstore/internal/git/temp_index.go b/go/store/blobstore/internal/git/temp_index.go new file mode 100644 index 00000000000..9403d25c458 --- /dev/null +++ b/go/store/blobstore/internal/git/temp_index.go @@ -0,0 +1,40 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package git + +import ( + "os" + "path/filepath" +) + +// NewTempIndex creates a unique temporary git index file (for use as GIT_INDEX_FILE). +// The index is created outside of any repo's GIT_DIR to avoid read-only repos and to +// avoid leaving scratch files in the repo on crashes. +// +// Note: git may also create a sibling lock file (.lock) during index writes. +func NewTempIndex() (dir, indexFile string, cleanup func(), err error) { + f, err := os.CreateTemp("", "dolt-git-index-") + if err != nil { + return "", "", nil, err + } + indexFile = f.Name() + _ = f.Close() + dir = filepath.Dir(indexFile) + cleanup = func() { + _ = os.Remove(indexFile) + _ = os.Remove(indexFile + ".lock") + } + return dir, indexFile, cleanup, nil +} diff --git a/go/store/nbs/git_blobstore_read_smoke_test.go b/go/store/nbs/git_blobstore_read_smoke_test.go index 8be83c3e0c2..700c5bc3b47 100644 --- a/go/store/nbs/git_blobstore_read_smoke_test.go +++ b/go/store/nbs/git_blobstore_read_smoke_test.go @@ -35,7 +35,7 @@ func TestGitBlobstoreReadSmoke_ManifestAndTableAccessPatterns(t *testing.T) { } ctx := context.Background() - repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + remoteRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/remote.git") require.NoError(t, err) // Seed a valid v5 manifest with no tables. This should allow NBS to open @@ -58,14 +58,20 @@ func TestGitBlobstoreReadSmoke_ManifestAndTableAccessPatterns(t *testing.T) { table[i] = byte(i % 251) } - commit, err := repo.SetRefToTree(ctx, blobstore.DoltDataRef, map[string][]byte{ + commit, err := remoteRepo.SetRefToTree(ctx, blobstore.DoltDataRef, map[string][]byte{ "manifest": buf.Bytes(), "table": table, }, "seed refs/dolt/data for smoke test") require.NoError(t, err) require.NotEmpty(t, commit) - bs, err := blobstore.NewGitBlobstore(repo.GitDir, blobstore.DoltDataRef) + localRepo, err := gitrepo.InitBare(ctx, t.TempDir()+"/local.git") + require.NoError(t, err) + cmd := exec.CommandContext(ctx, "git", "--git-dir", localRepo.GitDir, "remote", "add", "origin", remoteRepo.GitDir) + remoteAddOut, err := cmd.CombinedOutput() + require.NoError(t, err, "git remote add failed: %s", string(remoteAddOut)) + + bs, err := blobstore.NewGitBlobstore(localRepo.GitDir, blobstore.DoltDataRef) require.NoError(t, err) // 1) Manifest read path via blobstoreManifest.ParseIfExists.