Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package fsutil

import (
"io"
)

const chunkSize = 32 * 1024

type buffer struct {
chunks [][]byte
}

func (b *buffer) alloc(n int) []byte {
if n > chunkSize {
buf := make([]byte, n)
b.chunks = append(b.chunks, buf)
return buf
}

if len(b.chunks) != 0 {
lastChunk := b.chunks[len(b.chunks)-1]
l := len(lastChunk)
if l+n <= cap(lastChunk) {
lastChunk = lastChunk[:l+n]
b.chunks[len(b.chunks)-1] = lastChunk
return lastChunk[l : l+n]
}
}

buf := make([]byte, n, chunkSize)
b.chunks = append(b.chunks, buf)
return buf
}

func (b *buffer) WriteTo(w io.Writer) (n int64, err error) {
for _, c := range b.chunks {
m, err := w.Write(c)
n += int64(m)
if err != nil {
return n, err
}
}
return n, nil
}
70 changes: 70 additions & 0 deletions buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package fsutil

import (
"bytes"
"testing"

"github.com/stretchr/testify/require"
)

func TestAllocAndBytes(t *testing.T) {
buf := &buffer{}

out1 := buf.alloc(10)
require.Len(t, out1, 10)
copy(out1, []byte("abcdefghij"))

out2 := buf.alloc(5)
require.Len(t, out2, 5)
copy(out2, []byte("12345"))

res := &bytes.Buffer{}
n, err := buf.WriteTo(res)
require.NoError(t, err)
require.Equal(t, int64(15), n)
require.Equal(t, []byte("abcdefghij12345"), res.Bytes())
}

func TestLargeAllocGetsOwnChunk(t *testing.T) {
buf := &buffer{}

out := buf.alloc(100000)
require.Len(t, out, 100000)

for i := range out {
out[i] = byte(i % 256)
}

res := &bytes.Buffer{}
n, err := buf.WriteTo(res)
require.NoError(t, err)
require.Equal(t, int64(100000), n)
require.Equal(t, 100000, res.Len())
}

func TestMultipleChunkBoundary(t *testing.T) {
buf := &buffer{}

var written []byte
for i := 0; i < 100; i++ {
b := buf.alloc(400)
require.Len(t, b, 400)
for j := range b {
b[j] = byte((i + j) % 256)
}
written = append(written, b...)
}

res := &bytes.Buffer{}
n, err := buf.WriteTo(res)
require.NoError(t, err)
require.Equal(t, int64(40000), n)
require.Equal(t, 40000, res.Len())
dt := res.Bytes()
for i := 0; i < 100; i++ {
for j := 0; j < 400; j++ {
require.Equal(t, byte((i+j)%256), dt[i*400+j])
}
}
require.Equal(t, written, dt)
}
132 changes: 120 additions & 12 deletions receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package fsutil

import (
"context"
"encoding/binary"
"io"
"os"
"path/filepath"
Expand All @@ -51,13 +52,16 @@ const (
DiffContent
)

const metadataPath = ".fsutil-metadata"

type ReceiveOpt struct {
NotifyHashed ChangeFunc
ContentHasher ContentHasher
ProgressCb func(int, bool)
Merge bool
Filter FilterFunc
Differ DiffType
MetadataOnly FilterFunc
}

func Receive(ctx context.Context, conn Stream, dest string, opt ReceiveOpt) error {
Expand All @@ -75,21 +79,23 @@ func Receive(ctx context.Context, conn Stream, dest string, opt ReceiveOpt) erro
merge: opt.Merge,
filter: opt.Filter,
differ: opt.Differ,
metadataOnly: opt.MetadataOnly,
}
return r.run(ctx)
}

type receiver struct {
dest string
conn Stream
files map[string]uint32
pipes map[uint32]io.WriteCloser
mu sync.RWMutex
muPipes sync.RWMutex
progressCb func(int, bool)
merge bool
filter FilterFunc
differ DiffType
dest string
conn Stream
files map[string]uint32
pipes map[uint32]io.WriteCloser
mu sync.RWMutex
muPipes sync.RWMutex
progressCb func(int, bool)
merge bool
filter FilterFunc
differ DiffType
metadataOnly FilterFunc

notifyHashed ChangeFunc
contentHasher ContentHasher
Expand Down Expand Up @@ -164,6 +170,11 @@ func (r *receiver) run(ctx context.Context) error {
}

w := newDynamicWalker()
metadataTransfer := r.metadataOnly != nil
// buffer Stat metadata in framed proto
metadataBuffer := &buffer{}
// stack of parent paths that can be replayed if metadata filter matches
metadataParents := newStack[*currentPath]()

g.Go(func() (retErr error) {
defer func() {
Expand Down Expand Up @@ -223,10 +234,26 @@ func (r *receiver) run(ctx context.Context) error {
// e.g. a linux path foo/bar\baz cannot be represented on windows
return errors.WithStack(&os.PathError{Path: p.Stat.Path, Err: syscall.EINVAL, Op: "unrepresentable path"})
}
var metaOnly bool
if metadataTransfer {
if path == metadataPath {
continue
}
n := p.Stat.SizeVT()
dt := metadataBuffer.alloc(n + 4)
binary.LittleEndian.PutUint32(dt[0:4], uint32(n))
_, err := p.Stat.MarshalToSizedBufferVT(dt[4:])
if err != nil {
return err
}
if !r.metadataOnly(path, p.Stat) {
metaOnly = true
}
}
p.Stat.Path = path
p.Stat.Linkname = filepath.FromSlash(p.Stat.Linkname)

if fileCanRequestData(os.FileMode(p.Stat.Mode)) {
if !metaOnly && fileCanRequestData(os.FileMode(p.Stat.Mode)) {
r.mu.Lock()
r.files[p.Stat.Path] = i
r.mu.Unlock()
Expand All @@ -240,6 +267,31 @@ func (r *receiver) run(ctx context.Context) error {
if err := r.hlValidator.HandleChange(ChangeKindAdd, cp.path, &StatInfo{cp.stat}, nil); err != nil {
return err
}
if metadataTransfer {
parent := filepath.Dir(cp.path)
isDir := os.FileMode(p.Stat.Mode).IsDir()
for {
last, ok := metadataParents.peek()
if !ok || parent == last.path {
break
}
metadataParents.pop()
}
if isDir {
metadataParents.push(cp)
}
if metaOnly {
continue
} else {
for _, cp := range metadataParents.items {
if err := w.update(cp); err != nil {
return err
}
}
metadataParents.clear()
}
}

if err := w.update(cp); err != nil {
return err
}
Expand Down Expand Up @@ -272,7 +324,27 @@ func (r *receiver) run(ctx context.Context) error {
}
}
})
return g.Wait()

if err := g.Wait(); err != nil {
return err
}

if !metadataTransfer {
return nil
}

// although we don't allow tranferring metadataPath, make sure there was no preexisting file/symlink
os.Remove(filepath.Join(r.dest, metadataPath))

f, err := os.OpenFile(filepath.Join(r.dest, metadataPath), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
if _, err := metadataBuffer.WriteTo(f); err != nil {
f.Close()
return err
}
return f.Close()
}

func (r *receiver) asyncDataFunc(ctx context.Context, p string, wc io.WriteCloser) error {
Expand Down Expand Up @@ -327,3 +399,39 @@ func (w *wrappedWriteCloser) Wait(ctx context.Context) error {
return w.err
}
}

type stack[T any] struct {
items []T
}

func newStack[T any]() *stack[T] {
return &stack[T]{
items: make([]T, 0, 8),
}
}

func (s *stack[T]) push(v T) {
s.items = append(s.items, v)
}

func (s *stack[T]) pop() (T, bool) {
if len(s.items) == 0 {
var zero T
return zero, false
}
v := s.items[len(s.items)-1]
s.items = s.items[:len(s.items)-1]
return v, true
}

func (s *stack[T]) peek() (T, bool) {
if len(s.items) == 0 {
var zero T
return zero, false
}
return s.items[len(s.items)-1], true
}

func (s *stack[T]) clear() {
s.items = s.items[:0]
}
Loading