Skip to content
This repository has been archived by the owner on Oct 22, 2021. It is now read-only.

fix: Some error in storage is not handled #23

Merged
merged 8 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/intergration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ jobs:
matrix:
go: [ "1.15", "1.16" ]
hdfs-version:
- 3.2.2
- 3.3.0
- 3.3.1
os: [ubuntu-latest]
Expand Down
62 changes: 52 additions & 10 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,21 @@ func (s *Storage) create(path string, opt pairStorageCreate) (o *Object) {
func (s *Storage) createAppend(ctx context.Context, path string, opt pairStorageCreateAppend) (o *Object, err error) {
rp := s.getAbsPath(path)
dir := filepath.Dir(rp)
err = s.hdfs.MkdirAll(dir, 0666)

// If dirname is already a directory,
// MkdirAll does nothing and returns nil.
err = s.hdfs.MkdirAll(dir, 0755)
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
// If dirname is not exist ,it will create a Mkdir rpc communication
// So we just need to catch other errors
if err != nil {
return
return nil, err
}

_, err = s.hdfs.Stat(rp)

// The error returned by Stat can only be nil or not os.ErrNotExist
if err == nil {
// If the error returned by Stat is nil, the path must exist.
err = s.hdfs.Remove(rp)
if err != nil && errors.Is(err, os.ErrNotExist) {
// Omit `file not exist` error here
Expand All @@ -50,12 +57,21 @@ func (s *Storage) createAppend(ctx context.Context, path string, opt pairStorage
}
}

// This ensures that err can only be os.ErrNotExist
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, err
}

f, err := s.hdfs.Create(rp)

if err != nil {
return
return nil, err
}
defer func() {
f.Close()
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()

o = s.newObject(true)
Expand All @@ -69,9 +85,13 @@ func (s *Storage) createAppend(ctx context.Context, path string, opt pairStorage
func (s *Storage) createDir(ctx context.Context, path string, opt pairStorageCreateDir) (o *Object, err error) {
rp := s.getAbsPath(path)

// If dirname is already a directory,
// MkdirAll does nothing and returns nil.
err = s.hdfs.MkdirAll(rp, 0755)
// If dirname is not exist ,it will create a Mkdir rpc communication
// So we just need to catch other errors
if err != nil {
return
return nil, err
}

o = s.newObject(true)
Expand All @@ -83,12 +103,14 @@ func (s *Storage) createDir(ctx context.Context, path string, opt pairStorageCre

func (s *Storage) delete(ctx context.Context, path string, opt pairStorageDelete) (err error) {
rp := s.getAbsPath(path)

err = s.hdfs.Remove(rp)
if err != nil && errors.Is(err, os.ErrNotExist) {
// Omit `file not exist` error here
// ref: [GSP-46](https://github.com/beyondstorage/specs/blob/master/rfcs/46-idempotent-delete.md)
err = nil
}

return err
}

Expand Down Expand Up @@ -120,12 +142,16 @@ func (s *Storage) move(ctx context.Context, src string, dst string, opt pairStor
return services.ErrObjectModeInvalid
}
}

return s.hdfs.Rename(rs, rd)
}

func (s *Storage) read(ctx context.Context, path string, w io.Writer, opt pairStorageRead) (n int64, err error) {
rp := s.getAbsPath(path)
f, err := s.hdfs.Open(rp)
if err != nil {
return 0, err
}

defer func() {
closeErr := f.Close()
Expand All @@ -134,9 +160,6 @@ func (s *Storage) read(ctx context.Context, path string, w io.Writer, opt pairSt
}
}()

if err != nil {
return 0, err
}
if opt.HasOffset {
_, err := f.Seek(opt.Offset, 0)
if err != nil {
Expand Down Expand Up @@ -186,24 +209,38 @@ func (s *Storage) stat(ctx context.Context, path string, opt pairStorageStat) (o
func (s *Storage) write(ctx context.Context, path string, r io.Reader, size int64, opt pairStorageWrite) (n int64, err error) {
rp := s.getAbsPath(path)
dir := filepath.Dir(rp)
err = s.hdfs.MkdirAll(dir, 0666)

// If dirname is already a directory,
// MkdirAll does nothing and returns nil.
err = s.hdfs.MkdirAll(dir, 0755)
// If dirname is not exist ,it will create a Mkdir rpc communication
// So we just need to catch other errors
if err != nil {
return 0, err
}

_, err = s.hdfs.Stat(rp)
if err == nil {
// If the error returned by Stat is nil, the path must exist.
err = s.hdfs.Remove(rp)

if err != nil && errors.Is(err, os.ErrNotExist) {
// Omit `file not exist` error here
// ref: [GSP-46](https://github.com/beyondstorage/specs/blob/master/rfcs/46-idempotent-delete.md)
err = nil
}
}

// This ensures that err can only be os.ErrNotExist
if err != nil && !errors.Is(err, os.ErrNotExist) {
return 0, err
}

f, err := s.hdfs.Create(rp)
if err != nil {
return 0, err
}

defer func() {
closeErr := f.Close()
if err == nil {
Expand All @@ -220,11 +257,16 @@ func (s *Storage) write(ctx context.Context, path string, r io.Reader, size int6

func (s *Storage) writeAppend(ctx context.Context, o *Object, r io.Reader, size int64, opt pairStorageWriteAppend) (n int64, err error) {
f, err := s.hdfs.Append(o.ID)

if err != nil {
return
}

defer func() {
f.Close()
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()

return io.CopyN(f, r, size)
Expand Down