Skip to content

Commit

Permalink
sync: fix temporary name when create a file (#4215)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies authored and SandyXSD committed Feb 3, 2024
1 parent 437f4e6 commit 85c11d6
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 22 deletions.
21 changes: 12 additions & 9 deletions cmd/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -123,22 +122,26 @@ func (j *juiceFS) Put(key string, in io.Reader) (err error) {
eno := j.jfs.MkdirAll(ctx, p, 0777, j.umask)
return toError(eno)
}
tmp := path.Join(path.Dir(p), "."+path.Base(p)+".tmp"+strconv.Itoa(rand.Int()))
f, eno := j.jfs.Create(ctx, tmp, 0666, j.umask)
if eno == syscall.ENOENT {
_ = j.jfs.MkdirAll(ctx, path.Dir(tmp), 0777, j.umask)
f, eno = j.jfs.Create(ctx, tmp, 0666, j.umask)
}
if eno != 0 {
return toError(eno)
name := path.Base(p)
if len(name) > 200 {
name = name[:200]
}
tmp := path.Join(path.Dir(p), fmt.Sprintf(".%s.tmp.%d", name, rand.Int()))
defer func() {
if err != nil {
if e := j.jfs.Delete(ctx, tmp); e != 0 {
logger.Warnf("Failed to delete %s: %s", tmp, e)
}
}
}()
f, eno := j.jfs.Create(ctx, tmp, 0666, j.umask)
if eno == syscall.ENOENT {
_ = j.jfs.MkdirAll(ctx, path.Dir(tmp), 0777, j.umask)
f, eno = j.jfs.Create(ctx, tmp, 0666, j.umask)
}
if eno != 0 {
return toError(eno)
}
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
_, err = io.CopyBuffer(&jFile{f, 0}, in, *buf)
Expand Down
6 changes: 6 additions & 0 deletions cmd/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ func testFileSystem(t *testing.T, s object.ObjectStorage) {
t.Fatalf("testKeysEqual fail: %s", err)
}
}

// put a file with very long name
longName := strings.Repeat("a", 255)
if err := s.Put("dir/"+longName, bytes.NewReader([]byte{0})); err != nil {
t.Fatalf("PUT a file with long name `%s` failed: %q", longName, err)
}
}

func TestJFS(t *testing.T) {
Expand Down
18 changes: 11 additions & 7 deletions pkg/object/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,23 @@ func (d *filestore) Get(key string, off, limit int64) (io.ReadCloser, error) {
return f, nil
}

func (d *filestore) Put(key string, in io.Reader) error {
func (d *filestore) Put(key string, in io.Reader) (err error) {
p := d.path(key)

if strings.HasSuffix(key, dirSuffix) || key == "" && strings.HasSuffix(d.root, dirSuffix) {
return os.MkdirAll(p, os.FileMode(0777))
}

tmp := filepath.Join(filepath.Dir(p), "."+filepath.Base(p)+".tmp"+strconv.Itoa(rand.Int()))
name := filepath.Base(p)
if len(name) > 200 {
name = name[:200]
}
tmp := filepath.Join(filepath.Dir(p), "."+name+".tmp"+strconv.Itoa(rand.Int()))
defer func() {
if err != nil {
_ = os.Remove(tmp)
}
}()
f, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil && os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Dir(p), os.FileMode(0777)); err != nil {
Expand All @@ -154,11 +163,6 @@ func (d *filestore) Put(key string, in io.Reader) error {
if err != nil {
return err
}
defer func() {
if err != nil {
_ = os.Remove(tmp)
}
}()

if TryCFR {
_, err = io.Copy(f, in)
Expand Down
6 changes: 6 additions & 0 deletions pkg/object/filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,10 @@ func testFileSystem(t *testing.T, s ObjectStorage) {
}
}
}

// put a file with very long name
longName := strings.Repeat("a", 255)
if err := s.Put("dir/"+longName, bytes.NewReader([]byte{0})); err != nil {
t.Fatalf("PUT a file with long name `%s` failed: %q", longName, err)
}
}
14 changes: 11 additions & 3 deletions pkg/object/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,22 @@ func (h *hdfsclient) Get(key string, off, limit int64) (io.ReadCloser, error) {

const abcException = "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException"

func (h *hdfsclient) Put(key string, in io.Reader) error {
func (h *hdfsclient) Put(key string, in io.Reader) (err error) {
p := h.path(key)
if strings.HasSuffix(p, dirSuffix) {
return h.c.MkdirAll(p, 0777&^h.umask)
}
tmp := path.Join(path.Dir(p), fmt.Sprintf(".%s.tmp.%d", path.Base(p), rand.Int()))
name := path.Base(p)
if len(name) > 200 {
name = name[:200]
}
tmp := path.Join(path.Dir(p), fmt.Sprintf(".%s.tmp.%d", name, rand.Int()))
defer func() {
if err != nil {
_ = h.c.Remove(tmp)
}
}()
f, err := h.c.CreateFile(tmp, h.dfsReplication, 128<<20, 0666&^h.umask)
defer func() { _ = h.c.Remove(tmp) }()
if err != nil {
if pe, ok := err.(*os.PathError); ok && pe.Err == os.ErrNotExist {
_ = h.c.MkdirAll(path.Dir(p), 0777&^h.umask)
Expand Down
15 changes: 12 additions & 3 deletions pkg/object/sftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"fmt"
"io"
"math/rand"
"net"
"net/url"
"os"
Expand Down Expand Up @@ -203,7 +204,7 @@ func (f *sftpStore) Get(key string, off, limit int64) (io.ReadCloser, error) {
return ff, err
}

func (f *sftpStore) Put(key string, in io.Reader) error {
func (f *sftpStore) Put(key string, in io.Reader) (err error) {
c, err := f.getSftpConnection()
if err != nil {
return err
Expand All @@ -218,12 +219,20 @@ func (f *sftpStore) Put(key string, in io.Reader) error {
return err
}

tmp := path.Join(path.Dir(p), "."+path.Base(p)+".tmp")
name := path.Base(p)
if len(name) > 200 {
name = name[:200]
}
tmp := path.Join(path.Dir(p), fmt.Sprintf(".%s.tmp.%d", name, rand.Int()))
defer func() {
if err != nil {
_ = c.sftpClient.Remove(tmp)
}
}()
ff, err := c.sftpClient.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
if err != nil {
return err
}
defer func() { _ = c.sftpClient.Remove(tmp) }()
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
_, err = io.CopyBuffer(ff, in, *buf)
Expand Down

0 comments on commit 85c11d6

Please sign in to comment.