diff --git a/go/store/blobstore/git_blobstore.go b/go/store/blobstore/git_blobstore.go index 23a4dc96725..018eb90ea08 100644 --- a/go/store/blobstore/git_blobstore.go +++ b/go/store/blobstore/git_blobstore.go @@ -16,9 +16,15 @@ package blobstore import ( "context" + "errors" "fmt" "io" + "os" + "path/filepath" "strings" + "time" + + "github.com/cenkalti/backoff/v4" git "github.com/dolthub/dolt/go/store/blobstore/internal/git" ) @@ -27,26 +33,37 @@ import ( // database (bare repo or .git directory). It stores keys as paths within the tree // of the commit referenced by a git ref (e.g. refs/dolt/data). // -// This initial implementation is intentionally READ-ONLY. Write-path methods -// (Put / CheckAndPut / Concatenate) return an explicit unimplemented error while -// we lock down read behavior for manifests and table files. +// This implementation is being developed in phases. Read paths are implemented first, +// then write paths are added incrementally. At the moment, Put is implemented, while +// CheckAndPut and Concatenate are still unimplemented. type GitBlobstore struct { gitDir string ref string runner *git.Runner api git.GitAPI + // identity, when non-nil, is used as the author/committer identity for new commits. + // When nil, we prefer whatever identity git derives from env/config, falling back + // to a deterministic default only if git reports the identity is missing. + identity *git.Identity } var _ Blobstore = (*GitBlobstore)(nil) -// NewGitBlobstore creates a new read-only GitBlobstore rooted at |gitDir| and |ref|. -// |gitDir| should point at a bare repo directory or a .git directory. +// NewGitBlobstore creates a new GitBlobstore rooted at |gitDir| and |ref|. +// |gitDir| should point at a bare repo directory or a .git directory. Put is implemented, +// while CheckAndPut and Concatenate are still unimplemented (see type-level docs). func NewGitBlobstore(gitDir, ref string) (*GitBlobstore, error) { + return NewGitBlobstoreWithIdentity(gitDir, ref, nil) +} + +// NewGitBlobstoreWithIdentity creates a GitBlobstore rooted at |gitDir| and |ref|, optionally +// forcing an author/committer identity for write paths. +func NewGitBlobstoreWithIdentity(gitDir, ref string, identity *git.Identity) (*GitBlobstore, error) { r, err := git.NewRunner(gitDir) if err != nil { return nil, err } - return &GitBlobstore{gitDir: gitDir, ref: ref, runner: r, api: git.NewGitAPIImpl(r)}, nil + return &GitBlobstore{gitDir: gitDir, ref: ref, runner: r, api: git.NewGitAPIImpl(r), identity: identity}, nil } func (gbs *GitBlobstore) Path() string { @@ -157,10 +174,175 @@ func (l *limitReadCloser) Read(p []byte) (int, error) { return l.r.Read(p) } func (l *limitReadCloser) Close() error { return l.c.Close() } func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (string, error) { - if _, err := normalizeGitTreePath(key); err != nil { + key, err := normalizeGitTreePath(key) + if err != nil { + return "", err + } + + // Hash the contents once. If we need to retry due to concurrent updates to |gbs.ref|, + // we can reuse the blob OID without re-reading |reader|. + blobOID, err := gbs.api.HashObject(ctx, reader) + if err != nil { + return "", err + } + + // Make Put resilient to concurrent writers updating unrelated keys by using a CAS loop + // under the hood. This matches typical object-store semantics more closely than an + // unconditional ref update (which could clobber other keys). + const maxRetries = 31 // 32 total attempts (initial + retries) + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = 5 * time.Millisecond + bo.Multiplier = 2 + bo.MaxInterval = 320 * time.Millisecond + bo.RandomizationFactor = 0 // deterministic; can add jitter later if needed + bo.Reset() + policy := backoff.WithContext(backoff.WithMaxRetries(bo, maxRetries), ctx) + + var ver string + op := func() error { + parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + if err != nil { + return backoff.Permanent(err) + } + + newCommit, msg, err := gbs.buildPutCommit(ctx, parent, ok, key, blobOID) + if err != nil { + return backoff.Permanent(err) + } + + if !ok { + // Best-effort ref creation. If a concurrent writer created the ref first, retry + // and take the normal CAS path on the new head. + if err := gbs.api.UpdateRef(ctx, gbs.ref, newCommit, msg); err != nil { + if gbs.refAdvanced(ctx, parent) { + return err + } + return backoff.Permanent(err) + } + ver = newCommit.String() + return nil + } + + err = gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg) + if err == nil { + ver = newCommit.String() + return nil + } + + // If the ref changed since we read |parent|, retry on the new head. Otherwise + // surface the error (e.g. permissions, corruption). + if gbs.refAdvanced(ctx, parent) { + return err + } + return backoff.Permanent(err) + } + + if err := backoff.Retry(op, policy); err != nil { + if ctx.Err() != nil { + return "", ctx.Err() + } return "", err } - return "", fmt.Errorf("%w: GitBlobstore.Put", git.ErrUnimplemented) + return ver, nil +} + +func (gbs *GitBlobstore) buildPutCommit(ctx context.Context, parent git.OID, hasParent bool, key string, blobOID git.OID) (git.OID, string, error) { + _, indexFile, cleanup, err := newTempIndex() + if err != nil { + return "", "", err + } + defer cleanup() + + if hasParent { + if err := gbs.api.ReadTree(ctx, parent, indexFile); err != nil { + return "", "", err + } + } else { + if err := gbs.api.ReadTreeEmpty(ctx, indexFile); err != nil { + return "", "", err + } + } + + // TODO(gitblobstore): Decide on a policy for file-vs-directory prefix conflicts when staging keys. + // For example, staging "a" when "a/b" already exists in the tree/index (or vice-versa) can fail + // with a git index error (path appears as both a file and directory). Today our NBS keyspace is + // flat (e.g. "manifest", "", ".records"), so this should not occur. If we ever + // namespace keys into directories, consider proactively removing conflicting paths from the index + // before UpdateIndexCacheInfo so Put/CheckAndPut remain robust. + if err := gbs.api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, key); err != nil { + return "", "", err + } + + treeOID, err := gbs.api.WriteTree(ctx, indexFile) + if err != nil { + return "", "", err + } + + var parentPtr *git.OID + if hasParent && parent != "" { + p := parent + parentPtr = &p + } + msg := fmt.Sprintf("gitblobstore: put %s", key) + + // Prefer git's default identity from env/config when not explicitly configured. + commitOID, err := gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, gbs.identity) + if err != nil && gbs.identity == nil && isMissingGitIdentityErr(err) { + commitOID, err = gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, defaultGitBlobstoreIdentity()) + } + if err != nil { + return "", "", err + } + + return commitOID, msg, nil +} + +func defaultGitBlobstoreIdentity() *git.Identity { + // Deterministic fallback identity for environments without git identity configured. + return &git.Identity{Name: "dolt gitblobstore", Email: "gitblobstore@dolt.invalid"} +} + +func isMissingGitIdentityErr(err error) bool { + var ce *git.CmdError + if !errors.As(err, &ce) { + return false + } + msg := strings.ToLower(string(ce.Output)) + // Common git messages: + // - "Author identity unknown" + // - "fatal: unable to auto-detect email address" + // - "fatal: empty ident name" + return strings.Contains(msg, "author identity unknown") || + strings.Contains(msg, "unable to auto-detect email address") || + strings.Contains(msg, "empty ident name") +} + +func newTempIndex() (dir, indexFile string, cleanup func(), err error) { + // Create a unique temp index file. This is intentionally *not* placed under GIT_DIR: + // - some git dirs may be read-only or otherwise unsuitable for scratch files + // - we don't want to leave temp files inside the repo on crashes + // + // Note: git will also create a sibling lock file (.lock) during index writes. + f, err := os.CreateTemp("", "dolt-gitblobstore-index-") + if err != nil { + return "", "", nil, err + } + indexFile = f.Name() + _ = f.Close() + dir = filepath.Dir(indexFile) + cleanup = func() { + _ = os.Remove(indexFile) + _ = os.Remove(indexFile + ".lock") + } + return dir, indexFile, cleanup, nil +} + +func (gbs *GitBlobstore) refAdvanced(ctx context.Context, old git.OID) bool { + if ctx.Err() != nil { + return false + } + cur, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref) + return err == nil && ok && cur != old } func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader) (string, error) { diff --git a/go/store/blobstore/git_blobstore_test.go b/go/store/blobstore/git_blobstore_test.go index ff7fd177e08..cd37ec8d960 100644 --- a/go/store/blobstore/git_blobstore_test.go +++ b/go/store/blobstore/git_blobstore_test.go @@ -15,9 +15,14 @@ package blobstore import ( + "bytes" "context" "errors" + "io" + "os" "os/exec" + "path/filepath" + "sync/atomic" "testing" "github.com/stretchr/testify/require" @@ -26,10 +31,19 @@ import ( "github.com/dolthub/dolt/go/store/testutils/gitrepo" ) -func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) { +func requireGitOnPath(t *testing.T) { + t.Helper() if _, err := exec.LookPath("git"); err != nil { t.Skip("git not found on PATH") } +} + +func testIdentity() *git.Identity { + return &git.Identity{Name: "gitblobstore test", Email: "gitblobstore@test.invalid"} +} + +func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) { + requireGitOnPath(t) ctx := context.Background() repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") @@ -55,9 +69,7 @@ func TestGitBlobstore_RefMissingIsNotFound(t *testing.T) { } func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) { - if _, err := exec.LookPath("git"); err != nil { - t.Skip("git not found on PATH") - } + requireGitOnPath(t) ctx := context.Background() repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") @@ -100,9 +112,7 @@ func TestGitBlobstore_ExistsAndGet_AllRange(t *testing.T) { } func TestGitBlobstore_Get_NotFoundMissingKey(t *testing.T) { - if _, err := exec.LookPath("git"); err != nil { - t.Skip("git not found on PATH") - } + requireGitOnPath(t) ctx := context.Background() repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") @@ -122,9 +132,7 @@ func TestGitBlobstore_Get_NotFoundMissingKey(t *testing.T) { } func TestGitBlobstore_BlobRangeSemantics(t *testing.T) { - if _, err := exec.LookPath("git"); err != nil { - t.Skip("git not found on PATH") - } + requireGitOnPath(t) ctx := context.Background() repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") @@ -173,9 +181,7 @@ func TestGitBlobstore_BlobRangeSemantics(t *testing.T) { } func TestGitBlobstore_InvalidKeysError(t *testing.T) { - if _, err := exec.LookPath("git"); err != nil { - t.Skip("git not found on PATH") - } + requireGitOnPath(t) ctx := context.Background() repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") @@ -206,5 +212,176 @@ func TestGitBlobstore_InvalidKeysError(t *testing.T) { _, _, _, err = bs.Get(ctx, k, AllRange) require.Error(t, err, "expected error for key %q", k) + + _, err = bs.Put(ctx, k, 1, bytes.NewReader([]byte("x"))) + require.Error(t, err, "expected error for key %q", k) + } +} + +func TestGitBlobstore_Put_RoundTripAndVersion(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + want := []byte("hello put\n") + ver, err := PutBytes(ctx, bs, "k", want) + require.NoError(t, err) + require.NotEmpty(t, ver) + + ok, err := bs.Exists(ctx, "k") + require.NoError(t, err) + require.True(t, ok) + + got, ver2, err := GetBytes(ctx, bs, "k", AllRange) + require.NoError(t, err) + require.Equal(t, ver, ver2) + require.Equal(t, want, got) +} + +func TestGitBlobstore_Put_Overwrite(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + ver1, err := PutBytes(ctx, bs, "k", []byte("v1\n")) + require.NoError(t, err) + require.NotEmpty(t, ver1) + + ver2, err := PutBytes(ctx, bs, "k", []byte("v2\n")) + require.NoError(t, err) + require.NotEmpty(t, ver2) + require.NotEqual(t, ver1, ver2) + + got, ver3, err := GetBytes(ctx, bs, "k", AllRange) + require.NoError(t, err) + require.Equal(t, ver2, ver3) + require.Equal(t, []byte("v2\n"), got) +} + +type hookGitAPI struct { + git.GitAPI + + ref string + // if set, called once before the first UpdateRefCAS executes. + onFirstCAS func(ctx context.Context, old git.OID) + did atomic.Bool +} + +func (h *hookGitAPI) UpdateRefCAS(ctx context.Context, ref string, newOID git.OID, oldOID git.OID, msg string) error { + if h.onFirstCAS != nil && !h.did.Swap(true) && ref == h.ref { + h.onFirstCAS(ctx, oldOID) + } + return h.GitAPI.UpdateRefCAS(ctx, ref, newOID, oldOID, msg) +} + +func writeKeyToRef(ctx context.Context, api git.GitAPI, ref string, key string, data []byte, author *git.Identity) (git.OID, error) { + parent, ok, err := api.TryResolveRefCommit(ctx, ref) + if err != nil { + return "", err + } + + indexDir, err := os.MkdirTemp("", "gitblobstore-test-index-") + if err != nil { + return "", err + } + defer func() { _ = os.RemoveAll(indexDir) }() + indexFile := filepath.Join(indexDir, "index") + + if ok { + if err := api.ReadTree(ctx, parent, indexFile); err != nil { + return "", err + } + } else { + if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { + return "", err + } } + + blobOID, err := api.HashObject(ctx, bytes.NewReader(data)) + if err != nil { + return "", err + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, key); err != nil { + return "", err + } + + treeOID, err := api.WriteTree(ctx, indexFile) + if err != nil { + return "", err + } + + var parentPtr *git.OID + if ok && parent != "" { + p := parent + parentPtr = &p + } + + msg := "test external writer" + commitOID, err := api.CommitTree(ctx, treeOID, parentPtr, msg, author) + if err != nil { + return "", err + } + + if err := api.UpdateRef(ctx, ref, commitOID, msg); err != nil { + return "", err + } + return commitOID, nil +} + +func TestGitBlobstore_Put_ContentionRetryPreservesOtherKey(t *testing.T) { + requireGitOnPath(t) + + ctx := context.Background() + repo, err := gitrepo.InitBare(ctx, t.TempDir()+"/repo.git") + require.NoError(t, err) + + // Seed the ref so Put takes the CAS path. + _, err = repo.SetRefToTree(ctx, DoltDataRef, map[string][]byte{ + "base": []byte("base\n"), + }, "seed") + require.NoError(t, err) + + bs, err := NewGitBlobstoreWithIdentity(repo.GitDir, DoltDataRef, testIdentity()) + require.NoError(t, err) + + origAPI := bs.api + h := &hookGitAPI{GitAPI: origAPI, ref: DoltDataRef} + h.onFirstCAS = func(ctx context.Context, old git.OID) { + // Advance the ref to simulate another writer committing concurrently. + _, _ = writeKeyToRef(ctx, origAPI, DoltDataRef, "external", []byte("external\n"), testIdentity()) + } + bs.api = h + + ver, err := PutBytes(ctx, bs, "k", []byte("mine\n")) + require.NoError(t, err) + require.NotEmpty(t, ver) + + got, ver2, err := GetBytes(ctx, bs, "k", AllRange) + require.NoError(t, err) + require.Equal(t, ver, ver2) + require.Equal(t, []byte("mine\n"), got) + + got, _, err = GetBytes(ctx, bs, "external", AllRange) + require.NoError(t, err) + require.Equal(t, []byte("external\n"), got) + + got, _, err = GetBytes(ctx, bs, "base", AllRange) + require.NoError(t, err) + require.Equal(t, []byte("base\n"), got) + + // Sanity: BlobReader path still works for the new commit. + rc, _, _, err := bs.Get(ctx, "k", AllRange) + require.NoError(t, err) + _, _ = io.ReadAll(rc) + _ = rc.Close() } diff --git a/go/store/blobstore/internal/git/impl_test.go b/go/store/blobstore/internal/git/impl_test.go index 8b168a9f0a0..8219f27ce19 100644 --- a/go/store/blobstore/internal/git/impl_test.go +++ b/go/store/blobstore/internal/git/impl_test.go @@ -238,6 +238,134 @@ func TestGitAPIImpl_WriteTree_FromEmptyIndex(t *testing.T) { } } +func TestGitAPIImpl_UpdateIndexCacheInfo_ReplacesExistingEntry(t *testing.T) { + t.Parallel() + + ctx := context.Background() + repo, err := gitrepo.InitBareTemp(ctx, "") + if err != nil { + t.Fatal(err) + } + + r, err := NewRunner(repo.GitDir) + if err != nil { + t.Fatal(err) + } + api := NewGitAPIImpl(r) + + indexFile := tempIndexFile(t) + if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { + t.Fatal(err) + } + + path := "same.txt" + + oid1, err := api.HashObject(ctx, bytes.NewReader([]byte("one\n"))) + if err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oid1, path); err != nil { + t.Fatal(err) + } + + // Update the same path again; this should succeed and replace the index entry. + oid2, err := api.HashObject(ctx, bytes.NewReader([]byte("two\n"))) + if err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oid2, path); err != nil { + t.Fatal(err) + } + + treeOID, err := api.WriteTree(ctx, indexFile) + if err != nil { + t.Fatal(err) + } + commitOID, err := api.CommitTree(ctx, treeOID, nil, "replace entry", testAuthor()) + if err != nil { + t.Fatal(err) + } + + gotBlobOID, err := api.ResolvePathBlob(ctx, commitOID, path) + if err != nil { + t.Fatal(err) + } + rc, err := api.BlobReader(ctx, gotBlobOID) + if err != nil { + t.Fatal(err) + } + defer rc.Close() + got, err := io.ReadAll(rc) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, []byte("two\n")) { + t.Fatalf("expected replacement contents %q, got %q", "two\n", string(got)) + } +} + +func TestGitAPIImpl_UpdateIndexCacheInfo_FileDirectoryConflictErrors(t *testing.T) { + t.Parallel() + + ctx := context.Background() + repo, err := gitrepo.InitBareTemp(ctx, "") + if err != nil { + t.Fatal(err) + } + + r, err := NewRunner(repo.GitDir) + if err != nil { + t.Fatal(err) + } + api := NewGitAPIImpl(r) + + indexFile := tempIndexFile(t) + if err := api.ReadTreeEmpty(ctx, indexFile); err != nil { + t.Fatal(err) + } + + oidDirChild, err := api.HashObject(ctx, bytes.NewReader([]byte("child\n"))) + if err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidDirChild, "a/b.txt"); err != nil { + t.Fatal(err) + } + + // Now try to stage "a" as a file; git should reject this (file vs directory conflict). + oidA, err := api.HashObject(ctx, bytes.NewReader([]byte("a\n"))) + if err != nil { + t.Fatal(err) + } + err = api.UpdateIndexCacheInfo(ctx, indexFile, "100644", oidA, "a") + if err == nil { + t.Fatalf("expected conflict error staging %q when %q exists", "a", "a/b.txt") + } + + // Inverse conflict: stage "x" as a file, then try to stage "x/y". + indexFile2 := tempIndexFile(t) + if err := api.ReadTreeEmpty(ctx, indexFile2); err != nil { + t.Fatal(err) + } + + oidX, err := api.HashObject(ctx, bytes.NewReader([]byte("x\n"))) + if err != nil { + t.Fatal(err) + } + if err := api.UpdateIndexCacheInfo(ctx, indexFile2, "100644", oidX, "x"); err != nil { + t.Fatal(err) + } + + oidXY, err := api.HashObject(ctx, bytes.NewReader([]byte("xy\n"))) + if err != nil { + t.Fatal(err) + } + err = api.UpdateIndexCacheInfo(ctx, indexFile2, "100644", oidXY, "x/y.txt") + if err == nil { + t.Fatalf("expected conflict error staging %q when %q exists", "x/y.txt", "x") + } +} + func TestGitAPIImpl_ReadTree_PreservesExistingPaths(t *testing.T) { t.Parallel()