Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 212 additions & 6 deletions go/store/blobstore/git_blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,83 @@ func (m *multiPartReadCloser) Close() error {
return nil
}

type concatReadCloser struct {
ctx context.Context
keys []string
open func(ctx context.Context, key string) (io.ReadCloser, error)
cur int
curRC io.ReadCloser
done bool
}

func (c *concatReadCloser) ensureCurrent() error {
if c.done || c.curRC != nil {
return nil
}
if c.cur >= len(c.keys) {
c.done = true
return nil
}
rc, err := c.open(c.ctx, c.keys[c.cur])
if err != nil {
return err
}
c.curRC = rc
return nil
}

func (c *concatReadCloser) closeCurrentAndAdvance() error {
if c.curRC != nil {
err := c.curRC.Close()
c.curRC = nil
c.cur++
return err
}
c.cur++
return nil
}

func (c *concatReadCloser) Read(p []byte) (int, error) {
for {
if err := c.ensureCurrent(); err != nil {
return 0, err
}
if c.curRC == nil {
return 0, io.EOF
}

n, err := c.curRC.Read(p)
if n > 0 {
// Preserve data; defer advancement until next Read call.
if err == io.EOF {
_ = c.closeCurrentAndAdvance()
return n, nil
}
return n, err
}
if err == nil {
continue
}
if err == io.EOF {
if cerr := c.closeCurrentAndAdvance(); cerr != nil {
return 0, cerr
}
continue
}
return 0, err
}
}

func (c *concatReadCloser) Close() error {
c.done = true
if c.curRC != nil {
err := c.curRC.Close()
c.curRC = nil
return err
}
return nil
}

// GitBlobstore is a Blobstore implementation backed by a git repository's object
// database (bare repo or .git directory). It stores keys as paths within the tree
// of the commit referenced by a git ref (e.g. refs/dolt/data).
Expand Down Expand Up @@ -801,20 +878,149 @@ func (gbs *GitBlobstore) currentKeyVersion(ctx context.Context, commit git.OID,
}

func (gbs *GitBlobstore) Concatenate(ctx context.Context, key string, sources []string) (string, error) {
// Chunked-object support is landing in phases. Concatenate is the final piece
// needed for NBS conjoin and is intentionally left unimplemented on this branch.
//
// Keep key validation for consistent error behavior.
_, err := normalizeGitTreePath(key)
var err error
key, err = normalizeGitTreePath(key)
if err != nil {
return "", err
}
if len(sources) == 0 {
return "", fmt.Errorf("gitblobstore: concatenate requires at least one source")
}
normSources := make([]string, 0, len(sources))
for _, src := range sources {
if _, err := normalizeGitTreePath(src); err != nil {
norm, err := normalizeGitTreePath(src)
if err != nil {
return "", err
}
normSources = append(normSources, norm)
}
sources = normSources

// For non-manifest keys, match Put's behavior: if the key already exists, succeed without overwriting.
if ver, ok, err := gbs.tryFastSucceedPutIfKeyExists(ctx, key); err != nil {
return "", err
} else if ok {
return ver, nil
}

// Resolve a snapshot commit for the sources.
commit, ok, err := gbs.api.TryResolveRefCommit(ctx, gbs.ref)
if err != nil {
return "", err
}
if !ok {
// Consistent with Get: empty store => manifest missing, other keys => ref missing.
if key == "manifest" {
return "", NotFound{Key: key}
}
return "", &git.RefNotFoundError{Ref: gbs.ref}
}

totalSz, err := gbs.totalSizeAtCommit(ctx, commit, sources)
if err != nil {
return "", err
}

rc := &concatReadCloser{
ctx: ctx,
keys: sources,
open: func(ctx context.Context, k string) (io.ReadCloser, error) {
return gbs.openReaderAtCommit(ctx, commit, k)
},
}
defer rc.Close()

plan, err := gbs.planPutWrites(ctx, key, totalSz, rc)
if err != nil {
return "", err
}

msg := fmt.Sprintf("gitblobstore: concatenate %s", key)
return gbs.putWithCASRetries(ctx, key, plan, msg)
}

func (gbs *GitBlobstore) openReaderAtCommit(ctx context.Context, commit git.OID, key string) (io.ReadCloser, error) {
oid, typ, err := gbs.resolveObjectForGet(ctx, commit, key)
if err != nil {
return nil, err
}
switch typ {
case git.ObjectTypeBlob:
return gbs.api.BlobReader(ctx, oid)
case git.ObjectTypeTree:
rc, _, _, err := gbs.openChunkedTreeRange(ctx, commit, key, AllRange)
if err != nil {
// Defensive: resolveObjectForGet succeeded, but keep NotFound mapping consistent.
var pnf *git.PathNotFoundError
if errors.As(err, &pnf) {
return nil, NotFound{Key: key}
}
return nil, err
}
return rc, nil
default:
return nil, fmt.Errorf("gitblobstore: unsupported object type %q for key %q", typ, key)
}
}

// sizeAtCommit returns the byte size of |key| as of |commit|.
// It supports both inline blobs and the chunked-tree representation used by GitBlobstore.
// If |key| is missing at |commit|, it returns NotFound{Key: key}.
func (gbs *GitBlobstore) sizeAtCommit(ctx context.Context, commit git.OID, key string) (uint64, error) {
oid, typ, err := gbs.api.ResolvePathObject(ctx, commit, key)
if err != nil {
if git.IsPathNotFound(err) {
return 0, NotFound{Key: key}
}
return 0, err
}

switch typ {
case git.ObjectTypeBlob:
sz, err := gbs.api.BlobSize(ctx, oid)
if err != nil {
return 0, err
}
if sz < 0 {
return 0, fmt.Errorf("gitblobstore: invalid blob size %d for key %q", sz, key)
}
return uint64(sz), nil

case git.ObjectTypeTree:
entries, err := gbs.api.ListTree(ctx, commit, key)
if err != nil {
if git.IsPathNotFound(err) {
return 0, NotFound{Key: key}
}
return 0, err
}
_, total, err := gbs.validateAndSizeChunkedParts(ctx, entries)
return total, err

default:
return 0, fmt.Errorf("gitblobstore: unsupported object type %q for key %q", typ, key)
}
}

// totalSizeAtCommit sums the sizes of |sources| at |commit| and returns the total as int64.
// Returns an error on overflow or if any source is missing.
func (gbs *GitBlobstore) totalSizeAtCommit(ctx context.Context, commit git.OID, sources []string) (int64, error) {
var total uint64
for _, src := range sources {
sz, err := gbs.sizeAtCommit(ctx, commit, src)
if err != nil {
return 0, err
}
if sz > math.MaxUint64-total {
return 0, fmt.Errorf("gitblobstore: concatenated size overflow")
}
total += sz
}
if total > uint64(math.MaxInt64) {
return 0, fmt.Errorf("gitblobstore: concatenated size %d overflows int64", total)
}
return "", git.ErrUnimplemented
return int64(total), nil
}

func sliceInlineBlob(rc io.ReadCloser, sz int64, br BlobRange, ver string) (io.ReadCloser, uint64, string, error) {
Expand Down
Loading
Loading