Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 63 additions & 27 deletions go/store/blobstore/git_blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ func (gbs *GitBlobstore) syncForRead(ctx context.Context) error {

// 1) Fetch remote ref into our remote-tracking ref.
if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil {
// An absent remote ref is treated as an empty store. This is required for NBS open
// (manifest ParseIfExists) against a freshly-initialized remote.
var rnf *git.RefNotFoundError
if errors.As(err, &rnf) && rnf.Ref == gbs.remoteRef {
return nil
}
return err
}

Expand All @@ -542,6 +548,49 @@ func (gbs *GitBlobstore) syncForRead(ctx context.Context) error {
return gbs.mergeCacheFromHead(ctx, remoteHead)
}

type gitblobstoreFetchRefError struct {
err error
}

func (e *gitblobstoreFetchRefError) Error() string { return e.err.Error() }
func (e *gitblobstoreFetchRefError) Unwrap() error { return e.err }

func (gbs *GitBlobstore) fetchAlignAndMergeForWrite(ctx context.Context) (remoteHead git.OID, ok bool, err error) {
if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil {
// If the remote ref is missing, treat this as an empty store and bootstrap on write.
// Note: there is no "empty ref" in Git; this means the ref is unborn (no commits yet).
// Callers will see ok=false and parent=="" and will:
// - build a root commit from an empty tree (no parent),
// - create/update gbs.localRef to that commit, and
// - push with an empty expected dst OID, which creates gbs.remoteRef on the remote.
var rnf *git.RefNotFoundError
if errors.As(err, &rnf) && rnf.Ref == gbs.remoteRef {
return "", false, nil
}
return "", false, &gitblobstoreFetchRefError{err: err}
}

remoteHead, ok, err = gbs.api.TryResolveRefCommit(ctx, gbs.remoteTrackingRef)
if err != nil {
return "", false, err
}
if !ok {
return "", false, &git.RefNotFoundError{Ref: gbs.remoteTrackingRef}
}

// Force-set owned local ref to remote head (remote is source-of-truth).
if err := gbs.api.UpdateRef(ctx, gbs.localRef, remoteHead, "gitblobstore: sync write"); err != nil {
return "", false, err
}

// Merge cache to reflect fetched contents.
if err := gbs.mergeCacheFromHead(ctx, remoteHead); err != nil {
return "", false, err
}

return remoteHead, true, nil
}

func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string, build func(parent git.OID, ok bool) (git.OID, error)) (string, error) {
if err := gbs.validateRemoteManaged(); err != nil {
return "", err
Expand All @@ -553,43 +602,30 @@ func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string

var ver string
op := func() error {
// 1) Fetch remote state into local tracking ref.
if err := gbs.api.FetchRef(ctx, gbs.remoteName, gbs.remoteRef, gbs.remoteTrackingRef); err != nil {
return err
}
remoteHead, okRemote, err := gbs.api.TryResolveRefCommit(ctx, gbs.remoteTrackingRef)
remoteHead, okRemote, err := gbs.fetchAlignAndMergeForWrite(ctx)
if err != nil {
return backoff.Permanent(err)
}
if !okRemote {
return backoff.Permanent(&git.RefNotFoundError{Ref: gbs.remoteTrackingRef})
}

// 2) Force-set owned local ref to remote head (remote is source-of-truth).
if err := gbs.api.UpdateRef(ctx, gbs.localRef, remoteHead, "gitblobstore: sync write"); err != nil {
return backoff.Permanent(err)
}

// 2b) Merge cache to reflect fetched contents.
if err := gbs.mergeCacheFromHead(ctx, remoteHead); err != nil {
var fe *gitblobstoreFetchRefError
if errors.As(err, &fe) {
return fe.err
}
return backoff.Permanent(err)
}

// 3) Apply this operation's changes on top of the remote head.
newCommit, err := build(remoteHead, true)
// Apply this operation's changes on top of the remote head (or empty store).
newCommit, err := build(remoteHead, okRemote)
if err != nil {
return backoff.Permanent(err)
}
if err := gbs.api.UpdateRef(ctx, gbs.localRef, newCommit, msg); err != nil {
return backoff.Permanent(err)
}

// 4) Push local ref to remote with lease.
// Push local ref to remote with lease.
if err := gbs.api.PushRefWithLease(ctx, gbs.remoteName, gbs.localRef, gbs.remoteRef, remoteHead); err != nil {
return err
}

// 5) Merge cache to reflect the new head after a successful push.
// Merge cache to reflect the new head after a successful push.
// When we successfully push a new head, it is safe (and required for correctness)
// to overwrite cache entries to reflect the new head's tree.
if err := gbs.mergeCacheFromHeadOverwriteAll(ctx, newCommit); err != nil {
Expand All @@ -614,15 +650,15 @@ func (gbs *GitBlobstore) remoteManagedWrite(ctx context.Context, key, msg string
}

func (gbs *GitBlobstore) putWithRemoteSync(ctx context.Context, key string, plan putPlan, msg string) (string, error) {
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, _ bool) (git.OID, error) {
return gbs.buildCommitForKeyWrite(ctx, remoteHead, true, key, plan, msg)
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, ok bool) (git.OID, error) {
return gbs.buildCommitForKeyWrite(ctx, remoteHead, ok, key, plan, msg)
})
}

func (gbs *GitBlobstore) checkAndPutWithRemoteSync(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader, msg string) (string, error) {
var cachedPlan *putPlan
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, _ bool) (git.OID, error) {
actualKeyVersion, err := gbs.currentKeyVersion(ctx, remoteHead, true, key)
return gbs.remoteManagedWrite(ctx, key, msg, func(remoteHead git.OID, ok bool) (git.OID, error) {
actualKeyVersion, err := gbs.currentKeyVersion(ctx, remoteHead, ok, key)
if err != nil {
return git.OID(""), err
}
Expand All @@ -636,7 +672,7 @@ func (gbs *GitBlobstore) checkAndPutWithRemoteSync(ctx context.Context, expected
}
cachedPlan = &plan
}
return gbs.buildCommitForKeyWrite(ctx, remoteHead, true, key, *cachedPlan, msg)
return gbs.buildCommitForKeyWrite(ctx, remoteHead, ok, key, *cachedPlan, msg)
})
}

Expand Down
36 changes: 36 additions & 0 deletions go/store/blobstore/git_blobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,42 @@ func TestGitBlobstore_RemoteManaged_PutPushesToRemote(t *testing.T) {
require.Equal(t, []byte("from local\n"), got)
}

func TestGitBlobstore_RemoteManaged_PutBootstrapsEmptyRemote(t *testing.T) {
requireGitOnPath(t)

ctx := context.Background()
remoteRepo, localRepo, _ := newRemoteAndLocalRepos(t, ctx)

// Do not seed refs/dolt/data in the remote: simulate a truly empty remote.
bs, err := NewGitBlobstoreWithIdentity(localRepo.GitDir, DoltDataRef, testIdentity())
require.NoError(t, err)

want := []byte("bootstrapped\n")
ver, err := bs.Put(ctx, "k", int64(len(want)), bytes.NewReader(want))
require.NoError(t, err)
require.NotEmpty(t, ver)

// Remote should now have refs/dolt/data and contain the key.
remoteRunner, err := git.NewRunner(remoteRepo.GitDir)
require.NoError(t, err)
remoteAPI := git.NewGitAPIImpl(remoteRunner)

remoteHead, err := remoteAPI.ResolveRefCommit(ctx, DoltDataRef)
require.NoError(t, err)
require.NotEmpty(t, remoteHead)

oid, typ, err := remoteAPI.ResolvePathObject(ctx, remoteHead, "k")
require.NoError(t, err)
require.Equal(t, git.ObjectTypeBlob, typ)

rc, err := remoteAPI.BlobReader(ctx, oid)
require.NoError(t, err)
got, rerr := io.ReadAll(rc)
_ = rc.Close()
require.NoError(t, rerr)
require.Equal(t, want, got)
}

type hookPushGitAPI struct {
git.GitAPI
onFirstPush func(ctx context.Context)
Expand Down
3 changes: 2 additions & 1 deletion go/store/blobstore/internal/git/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ type GitAPI interface {
FetchRef(ctx context.Context, remote string, srcRef string, dstRef string) error

// PushRefWithLease pushes |srcRef| to |dstRef| on |remote|, but only if the remote's |dstRef|
// equals |expectedDstOID| (force-with-lease).
// equals |expectedDstOID| (force-with-lease). If |expectedDstOID| is empty, it enforces that
// the remote |dstRef| is missing (bootstrap / create-if-missing semantics).
// Equivalent plumbing: GIT_DIR=... git push --force-with-lease=<dstRef>:<expectedDstOID> <remote> <srcRef>:<dstRef>
PushRefWithLease(ctx context.Context, remote string, srcRef string, dstRef string, expectedDstOID OID) error
}
Expand Down
19 changes: 16 additions & 3 deletions go/store/blobstore/internal/git/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ func (a *GitAPIImpl) FetchRef(ctx context.Context, remote string, srcRef string,
srcRef = strings.TrimPrefix(srcRef, "+")
refspec := "+" + srcRef + ":" + dstRef
_, err := a.r.Run(ctx, RunOptions{}, "fetch", "--no-tags", remote, refspec)
if err != nil && isRemoteRefNotFoundErr(err) {
return &RefNotFoundError{Ref: srcRef}
}
return err
}

Expand All @@ -334,9 +337,6 @@ func (a *GitAPIImpl) PushRefWithLease(ctx context.Context, remote string, srcRef
if dstRef == "" {
return fmt.Errorf("git push: dst ref is required")
}
if expectedDstOID == "" {
return fmt.Errorf("git push: expected dst oid is required")
}
srcRef = strings.TrimPrefix(srcRef, "+")
refspec := srcRef + ":" + dstRef
lease := "--force-with-lease=" + dstRef + ":" + expectedDstOID.String()
Expand All @@ -360,6 +360,19 @@ func isRefNotFoundErr(err error) bool {
strings.Contains(msg, "not a valid object name")
}

func isRemoteRefNotFoundErr(err error) bool {
ce, ok := err.(*CmdError)
if !ok {
return false
}
msg := strings.ToLower(string(ce.Output))
// Typical fetch failure when the remote ref doesn't exist:
// fatal: couldn't find remote ref refs/dolt/data
return strings.Contains(msg, "couldn't find remote ref") ||
strings.Contains(msg, "could not find remote ref") ||
strings.Contains(msg, "remote ref does not exist")
}

func isPathNotFoundErr(err error) bool {
ce, ok := err.(*CmdError)
if !ok {
Expand Down
72 changes: 72 additions & 0 deletions go/store/blobstore/internal/git/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,33 @@ func TestGitAPIImpl_FetchRef_ForcedUpdatesTrackingRef(t *testing.T) {
}
}

func TestGitAPIImpl_FetchRef_MissingRemoteRefReturnsRefNotFound(t *testing.T) {
t.Parallel()

ctx := context.Background()
remoteRepo, _, _ := newTestRepo(t, ctx)

_, localRunner, localAPI := newTestRepo(t, ctx)
_, err := localRunner.Run(ctx, RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
if err != nil {
t.Fatal(err)
}

remoteDataRef := "refs/dolt/data"
dstRef := "refs/dolt/remotes/origin/data"
err = localAPI.FetchRef(ctx, "origin", remoteDataRef, dstRef)
if err == nil {
t.Fatalf("expected error")
}
var rnf *RefNotFoundError
if !errors.As(err, &rnf) {
t.Fatalf("expected RefNotFoundError, got %T: %v", err, err)
}
if rnf.Ref != remoteDataRef {
t.Fatalf("expected missing ref %q, got %q", remoteDataRef, rnf.Ref)
}
}

func TestGitAPIImpl_PushRefWithLease_SucceedsThenRejectsStaleLease(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -919,3 +946,48 @@ func TestGitAPIImpl_PushRefWithLease_SucceedsThenRejectsStaleLease(t *testing.T)
t.Fatalf("remote ref changed unexpectedly on stale lease: got %q, want %q", got, r2)
}
}

func TestGitAPIImpl_PushRefWithLease_CreatesWhenMissing(t *testing.T) {
t.Parallel()

ctx := context.Background()
remoteRepo, _, remoteAPI := newTestRepo(t, ctx)

_, localRunner, localAPI := newTestRepo(t, ctx)
_, err := localRunner.Run(ctx, RunOptions{}, "remote", "add", "origin", remoteRepo.GitDir)
if err != nil {
t.Fatal(err)
}

// Create a local commit l1 and set local refs/dolt/data to it (src ref for push).
indexFile := tempIndexFile(t)
if err := localAPI.ReadTreeEmpty(ctx, indexFile); err != nil {
t.Fatal(err)
}
treeOID, err := localAPI.WriteTree(ctx, indexFile)
if err != nil {
t.Fatal(err)
}
l1, err := localAPI.CommitTree(ctx, treeOID, nil, "l1", testAuthor())
if err != nil {
t.Fatal(err)
}

srcRef := "refs/dolt/data"
if err := localAPI.UpdateRef(ctx, srcRef, l1, "set local src"); err != nil {
t.Fatal(err)
}

// Remote ref is missing. Push with an empty expected OID should create it.
if err := localAPI.PushRefWithLease(ctx, "origin", srcRef, srcRef, ""); err != nil {
t.Fatal(err)
}

got, err := remoteAPI.ResolveRefCommit(ctx, srcRef)
if err != nil {
t.Fatal(err)
}
if got != l1 {
t.Fatalf("remote ref mismatch after bootstrap push: got %q, want %q", got, l1)
}
}
Loading
Loading