Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 190 additions & 8 deletions go/store/blobstore/git_blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ package blobstore

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"

"github.com/cenkalti/backoff/v4"

git "github.com/dolthub/dolt/go/store/blobstore/internal/git"
)
Expand All @@ -27,26 +33,37 @@ import (
// database (bare repo or .git directory). It stores keys as paths within the tree
// of the commit referenced by a git ref (e.g. refs/dolt/data).
//
// This initial implementation is intentionally READ-ONLY. Write-path methods
// (Put / CheckAndPut / Concatenate) return an explicit unimplemented error while
// we lock down read behavior for manifests and table files.
// This implementation is being developed in phases. Read paths are implemented first,
// then write paths are added incrementally. At the moment, Put is implemented, while
// CheckAndPut and Concatenate are still unimplemented.
type GitBlobstore struct {
gitDir string
ref string
runner *git.Runner
api git.GitAPI
// identity, when non-nil, is used as the author/committer identity for new commits.
// When nil, we prefer whatever identity git derives from env/config, falling back
// to a deterministic default only if git reports the identity is missing.
identity *git.Identity
}

var _ Blobstore = (*GitBlobstore)(nil)

// NewGitBlobstore creates a new read-only GitBlobstore rooted at |gitDir| and |ref|.
// |gitDir| should point at a bare repo directory or a .git directory.
// NewGitBlobstore creates a new GitBlobstore rooted at |gitDir| and |ref|.
// |gitDir| should point at a bare repo directory or a .git directory. Put is implemented,
// while CheckAndPut and Concatenate are still unimplemented (see type-level docs).
func NewGitBlobstore(gitDir, ref string) (*GitBlobstore, error) {
return NewGitBlobstoreWithIdentity(gitDir, ref, nil)
}

// NewGitBlobstoreWithIdentity creates a GitBlobstore rooted at |gitDir| and |ref|, optionally
// forcing an author/committer identity for write paths.
func NewGitBlobstoreWithIdentity(gitDir, ref string, identity *git.Identity) (*GitBlobstore, error) {
r, err := git.NewRunner(gitDir)
if err != nil {
return nil, err
}
return &GitBlobstore{gitDir: gitDir, ref: ref, runner: r, api: git.NewGitAPIImpl(r)}, nil
return &GitBlobstore{gitDir: gitDir, ref: ref, runner: r, api: git.NewGitAPIImpl(r), identity: identity}, nil
}

func (gbs *GitBlobstore) Path() string {
Expand Down Expand Up @@ -157,10 +174,175 @@ func (l *limitReadCloser) Read(p []byte) (int, error) { return l.r.Read(p) }
func (l *limitReadCloser) Close() error { return l.c.Close() }

func (gbs *GitBlobstore) Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (string, error) {
if _, err := normalizeGitTreePath(key); err != nil {
key, err := normalizeGitTreePath(key)
if err != nil {
return "", err
}

// Hash the contents once. If we need to retry due to concurrent updates to |gbs.ref|,
// we can reuse the blob OID without re-reading |reader|.
blobOID, err := gbs.api.HashObject(ctx, reader)
if err != nil {
return "", err
}

// Make Put resilient to concurrent writers updating unrelated keys by using a CAS loop
// under the hood. This matches typical object-store semantics more closely than an
// unconditional ref update (which could clobber other keys).
const maxRetries = 31 // 32 total attempts (initial + retries)
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 5 * time.Millisecond
bo.Multiplier = 2
bo.MaxInterval = 320 * time.Millisecond
bo.RandomizationFactor = 0 // deterministic; can add jitter later if needed
bo.Reset()
policy := backoff.WithContext(backoff.WithMaxRetries(bo, maxRetries), ctx)

var ver string
op := func() error {
parent, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
if err != nil {
return backoff.Permanent(err)
}

newCommit, msg, err := gbs.buildPutCommit(ctx, parent, ok, key, blobOID)
if err != nil {
return backoff.Permanent(err)
}

if !ok {
// Best-effort ref creation. If a concurrent writer created the ref first, retry
// and take the normal CAS path on the new head.
if err := gbs.api.UpdateRef(ctx, gbs.ref, newCommit, msg); err != nil {
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}
ver = newCommit.String()
return nil
}

err = gbs.api.UpdateRefCAS(ctx, gbs.ref, newCommit, parent, msg)
if err == nil {
ver = newCommit.String()
return nil
}

// If the ref changed since we read |parent|, retry on the new head. Otherwise
// surface the error (e.g. permissions, corruption).
if gbs.refAdvanced(ctx, parent) {
return err
}
return backoff.Permanent(err)
}

if err := backoff.Retry(op, policy); err != nil {
if ctx.Err() != nil {
return "", ctx.Err()
}
return "", err
}
return "", fmt.Errorf("%w: GitBlobstore.Put", git.ErrUnimplemented)
return ver, nil
}

func (gbs *GitBlobstore) buildPutCommit(ctx context.Context, parent git.OID, hasParent bool, key string, blobOID git.OID) (git.OID, string, error) {
_, indexFile, cleanup, err := newTempIndex()
if err != nil {
return "", "", err
}
defer cleanup()

if hasParent {
if err := gbs.api.ReadTree(ctx, parent, indexFile); err != nil {
return "", "", err
}
} else {
if err := gbs.api.ReadTreeEmpty(ctx, indexFile); err != nil {
return "", "", err
}
}

// TODO(gitblobstore): Decide on a policy for file-vs-directory prefix conflicts when staging keys.
// For example, staging "a" when "a/b" already exists in the tree/index (or vice-versa) can fail
// with a git index error (path appears as both a file and directory). Today our NBS keyspace is
// flat (e.g. "manifest", "<tableid>", "<tableid>.records"), so this should not occur. If we ever
// namespace keys into directories, consider proactively removing conflicting paths from the index
// before UpdateIndexCacheInfo so Put/CheckAndPut remain robust.
if err := gbs.api.UpdateIndexCacheInfo(ctx, indexFile, "100644", blobOID, key); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do some testing and verify whether this is idempotent? If it isn't, can we make it so?

That is to say: If the file already exists, we should succeed here (AFAIK...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe its idempotent for files (overwrites) but will error if the a directory prefix is already staged, then you try writing that. Added a test that demonstrates this, though I don't expect this to be a problem for us since we are writting files only: https://github.com/dolthub/dolt/pull/10417/changes#diff-7e67da3161a3fc914b18c9152a5ea340fbf582ca9cb2efc2dc5ef6ab7b0d5c20R335

return "", "", err
}

treeOID, err := gbs.api.WriteTree(ctx, indexFile)
if err != nil {
return "", "", err
}

var parentPtr *git.OID
if hasParent && parent != "" {
p := parent
parentPtr = &p
}
msg := fmt.Sprintf("gitblobstore: put %s", key)

// Prefer git's default identity from env/config when not explicitly configured.
commitOID, err := gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, gbs.identity)
if err != nil && gbs.identity == nil && isMissingGitIdentityErr(err) {
commitOID, err = gbs.api.CommitTree(ctx, treeOID, parentPtr, msg, defaultGitBlobstoreIdentity())
}
if err != nil {
return "", "", err
}

return commitOID, msg, nil
}

func defaultGitBlobstoreIdentity() *git.Identity {
// Deterministic fallback identity for environments without git identity configured.
return &git.Identity{Name: "dolt gitblobstore", Email: "gitblobstore@dolt.invalid"}
}

func isMissingGitIdentityErr(err error) bool {
var ce *git.CmdError
if !errors.As(err, &ce) {
return false
}
msg := strings.ToLower(string(ce.Output))
// Common git messages:
// - "Author identity unknown"
// - "fatal: unable to auto-detect email address"
// - "fatal: empty ident name"
return strings.Contains(msg, "author identity unknown") ||
strings.Contains(msg, "unable to auto-detect email address") ||
strings.Contains(msg, "empty ident name")
}

func newTempIndex() (dir, indexFile string, cleanup func(), err error) {
// Create a unique temp index file. This is intentionally *not* placed under GIT_DIR:
// - some git dirs may be read-only or otherwise unsuitable for scratch files
// - we don't want to leave temp files inside the repo on crashes
//
// Note: git will also create a sibling lock file (<index>.lock) during index writes.
f, err := os.CreateTemp("", "dolt-gitblobstore-index-")
if err != nil {
return "", "", nil, err
}
indexFile = f.Name()
_ = f.Close()
dir = filepath.Dir(indexFile)
cleanup = func() {
_ = os.Remove(indexFile)
_ = os.Remove(indexFile + ".lock")
}
return dir, indexFile, cleanup, nil
}

func (gbs *GitBlobstore) refAdvanced(ctx context.Context, old git.OID) bool {
if ctx.Err() != nil {
return false
}
cur, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
return err == nil && ok && cur != old
}

func (gbs *GitBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key string, totalSize int64, reader io.Reader) (string, error) {
Expand Down
Loading
Loading