Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package fsutil

import (
"io"
)

const chunkSize = 32 * 1024

type buffer struct {
chunks [][]byte
}

func (b *buffer) alloc(n int) []byte {
if n > chunkSize {
buf := make([]byte, n)
b.chunks = append(b.chunks, buf)
return buf
}

if len(b.chunks) != 0 {
lastChunk := b.chunks[len(b.chunks)-1]
l := len(lastChunk)
if l+n <= cap(lastChunk) {
lastChunk = lastChunk[:l+n]
b.chunks[len(b.chunks)-1] = lastChunk
return lastChunk[l : l+n]
}
}

buf := make([]byte, n, chunkSize)
b.chunks = append(b.chunks, buf)
return buf
}

func (b *buffer) WriteTo(w io.Writer) (n int64, err error) {
for _, c := range b.chunks {
m, err := w.Write(c)
n += int64(m)
if err != nil {
return n, err
}
}
return n, nil
}
70 changes: 70 additions & 0 deletions buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package fsutil

import (
"bytes"
"testing"

"github.com/stretchr/testify/require"
)

func TestAllocAndBytes(t *testing.T) {
buf := &buffer{}

out1 := buf.alloc(10)
require.Len(t, out1, 10)
copy(out1, []byte("abcdefghij"))

out2 := buf.alloc(5)
require.Len(t, out2, 5)
copy(out2, []byte("12345"))

res := &bytes.Buffer{}
n, err := buf.WriteTo(res)
require.NoError(t, err)
require.Equal(t, int64(15), n)
require.Equal(t, []byte("abcdefghij12345"), res.Bytes())
}

func TestLargeAllocGetsOwnChunk(t *testing.T) {
buf := &buffer{}

out := buf.alloc(100000)
require.Len(t, out, 100000)

for i := range out {
out[i] = byte(i % 256)
}

res := &bytes.Buffer{}
n, err := buf.WriteTo(res)
require.NoError(t, err)
require.Equal(t, int64(100000), n)
require.Equal(t, 100000, res.Len())
}

func TestMultipleChunkBoundary(t *testing.T) {
buf := &buffer{}

var written []byte
for i := 0; i < 100; i++ {
b := buf.alloc(400)
require.Len(t, b, 400)
for j := range b {
b[j] = byte((i + j) % 256)
}
written = append(written, b...)
}

res := &bytes.Buffer{}
n, err := buf.WriteTo(res)
require.NoError(t, err)
require.Equal(t, int64(40000), n)
require.Equal(t, 40000, res.Len())
dt := res.Bytes()
for i := 0; i < 100; i++ {
for j := 0; j < 400; j++ {
require.Equal(t, byte((i+j)%256), dt[i*400+j])
}
}
require.Equal(t, written, dt)
}
66 changes: 55 additions & 11 deletions receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package fsutil

import (
"context"
"encoding/binary"
"io"
"os"
"path/filepath"
Expand All @@ -51,13 +52,16 @@ const (
DiffContent
)

const metadataPath = ".fsutil-metadata"

type ReceiveOpt struct {
NotifyHashed ChangeFunc
ContentHasher ContentHasher
ProgressCb func(int, bool)
Merge bool
Filter FilterFunc
Differ DiffType
MetadataOnly FilterFunc
}

func Receive(ctx context.Context, conn Stream, dest string, opt ReceiveOpt) error {
Expand All @@ -75,21 +79,23 @@ func Receive(ctx context.Context, conn Stream, dest string, opt ReceiveOpt) erro
merge: opt.Merge,
filter: opt.Filter,
differ: opt.Differ,
metadataOnly: opt.MetadataOnly,
}
return r.run(ctx)
}

type receiver struct {
dest string
conn Stream
files map[string]uint32
pipes map[uint32]io.WriteCloser
mu sync.RWMutex
muPipes sync.RWMutex
progressCb func(int, bool)
merge bool
filter FilterFunc
differ DiffType
dest string
conn Stream
files map[string]uint32
pipes map[uint32]io.WriteCloser
mu sync.RWMutex
muPipes sync.RWMutex
progressCb func(int, bool)
merge bool
filter FilterFunc
differ DiffType
metadataOnly FilterFunc

notifyHashed ChangeFunc
contentHasher ContentHasher
Expand Down Expand Up @@ -164,6 +170,8 @@ func (r *receiver) run(ctx context.Context) error {
}

w := newDynamicWalker()
metadataOnly := r.metadataOnly != nil
metadataBuffer := &buffer{}

g.Go(func() (retErr error) {
defer func() {
Expand Down Expand Up @@ -223,6 +231,22 @@ func (r *receiver) run(ctx context.Context) error {
// e.g. a linux path foo/bar\baz cannot be represented on windows
return errors.WithStack(&os.PathError{Path: p.Stat.Path, Err: syscall.EINVAL, Op: "unrepresentable path"})
}
if metadataOnly {
if path == metadataPath {
continue
}
n := p.Stat.SizeVT()
dt := metadataBuffer.alloc(n + 4)
binary.LittleEndian.PutUint32(dt[0:4], uint32(n))
_, err := p.Stat.MarshalToSizedBufferVT(dt[4:])
if err != nil {
return err
}
if !r.metadataOnly(path, p.Stat) {
i++
continue
}
}
p.Stat.Path = path
p.Stat.Linkname = filepath.FromSlash(p.Stat.Linkname)

Expand Down Expand Up @@ -272,7 +296,27 @@ func (r *receiver) run(ctx context.Context) error {
}
}
})
return g.Wait()

if err := g.Wait(); err != nil {
return err
}

if !metadataOnly {
return nil
}

// although we don't allow tranferring metadataPath, make sure there was no preexisting file/symlink
os.Remove(filepath.Join(r.dest, metadataPath))

f, err := os.OpenFile(filepath.Join(r.dest, metadataPath), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
if _, err := metadataBuffer.WriteTo(f); err != nil {
f.Close()
return err
}
return f.Close()
}

func (r *receiver) asyncDataFunc(ctx context.Context, p string, wc io.WriteCloser) error {
Expand Down
107 changes: 107 additions & 0 deletions receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"hash"
"io"
gofs "io/fs"
Expand Down Expand Up @@ -511,6 +512,112 @@ file zzz.aa
assert.Equal(t, false, ok)
}

func TestCopyMetadataOnly(t *testing.T) {
d, err := tmpDir(changeStream([]string{
"ADD foo file data1",
"ADD foo2 file dat2",
"ADD zzz dir",
"ADD zzz/aa file data3",
}))
assert.NoError(t, err)
defer os.RemoveAll(d)
fs, err := NewFS(d)
assert.NoError(t, err)
fs, err = NewFilterFS(fs, &FilterOpt{
Map: func(_ string, s *types.Stat) MapResult {
s.Uid = 0
s.Gid = 0
return MapResultKeep
},
})
assert.NoError(t, err)

dest := t.TempDir()

ts := newNotificationBuffer()
chs := &changes{fn: ts.HandleChange}

eg, ctx := errgroup.WithContext(context.Background())
s1, s2 := sockPairProto(ctx)

tm := time.Now().Truncate(time.Hour)

var processCbWasCalled bool
progressCb := func(size int, last bool) {
processCbWasCalled = true
}

eg.Go(func() error {
defer s1.(*fakeConnProto).closeSend()
return Send(ctx, s1, fs, progressCb)
})
eg.Go(func() error {
return Receive(ctx, s2, dest, ReceiveOpt{
NotifyHashed: chs.HandleChange,
ContentHasher: simpleSHA256Hasher,
Filter: func(p string, s *types.Stat) bool {
if runtime.GOOS != "windows" {
// On Windows, Getuid() and Getgid() always return -1
// See: https://pkg.go.dev/os#Getgid
// See: https://pkg.go.dev/os#Geteuid
s.Uid = uint32(os.Getuid())
s.Gid = uint32(os.Getgid())
}
s.ModTime = tm.UnixNano()
return true
},
MetadataOnly: func(p string, s *types.Stat) bool {
return p == "foo2"
},
})
})

assert.NoError(t, eg.Wait())
assert.True(t, processCbWasCalled)

b := &bytes.Buffer{}
err = Walk(context.Background(), dest, nil, bufWalk(b))
assert.NoError(t, err)

assert.Equal(t, `file .fsutil-metadata
file foo2
`, b.String())

dt, err := os.ReadFile(filepath.Join(dest, "foo2"))
assert.NoError(t, err)
assert.Equal(t, "dat2", string(dt))

dt, err = os.ReadFile(filepath.Join(dest, ".fsutil-metadata"))
assert.NoError(t, err)

files := parseFSMetadata(t, dt)
assert.Equal(t, 4, len(files))

assert.Equal(t, "foo", files[0].Path)
assert.Equal(t, int64(5), files[0].Size)
assert.Equal(t, "foo2", files[1].Path)
assert.Equal(t, int64(4), files[1].Size)
assert.Equal(t, "zzz", files[2].Path)
assert.Equal(t, int64(0), files[2].Size)
assert.True(t, (&StatInfo{&files[2]}).IsDir())
assert.Equal(t, "zzz/aa", files[3].Path)
assert.Equal(t, int64(5), files[3].Size)
}

func parseFSMetadata(t *testing.T, dt []byte) []types.Stat {
var m []types.Stat
for len(dt) > 0 {
var s types.Stat
n := binary.LittleEndian.Uint32(dt[:4])
dt = dt[4:]
err := s.Unmarshal(dt[:n])
assert.NoError(t, err)
m = append(m, *s.CloneVT())
dt = dt[n:]
}
return m
}

func sockPairProto(ctx context.Context) (Stream, Stream) {
c1 := make(chan []byte, 32)
c2 := make(chan []byte, 32)
Expand Down