Skip to content

Commit 66d69b5

Browse files
committed
fstree: serve Stream from the storage
Add a new `GetStream` method to the FSTree storage. This method allows for retrieving an object's payload as a stream (`io.ReadCloser`), which is significantly more memory-efficient for large objects as it avoids loading the entire payload into memory. Signed-off-by: Andrey Butusov <[email protected]>
1 parent c745de8 commit 66d69b5

File tree

4 files changed

+222
-38
lines changed

4 files changed

+222
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Changelog for NeoFS Node
55

66
### Added
77
- `Head` operation for FSTree (#3383)
8+
- `GetStream` operation for FSTree (#3431)
89

910
### Fixed
1011
- IR exponentially retries updating SN lists in the Container contract in error cases (#3344)

pkg/local_object_storage/blobstor/fstree/fstree.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,36 @@ func (t *FSTree) readFullObject(f io.Reader, initial []byte, size int64) ([]byte
489489
return t.Decompress(data)
490490
}
491491

492+
// GetStream returns an object from the storage by address as a stream.
493+
// It returns the object with header only, and a reader for the payload.
494+
// If the object is small, the payload may be returned in the object itself,
495+
// and the reader will be nil.
496+
// The caller is responsible for closing the returned io.ReadCloser if it is not nil.
497+
func (t *FSTree) GetStream(addr oid.Address) (*objectSDK.Object, io.ReadCloser, error) {
498+
p := t.treePath(addr)
499+
500+
f, err := os.Open(p)
501+
if err != nil {
502+
if errors.Is(err, fs.ErrNotExist) {
503+
return nil, nil, logicerr.Wrap(apistatus.ObjectNotFound{})
504+
}
505+
return nil, nil, fmt.Errorf("read file %q: %w", p, err)
506+
}
507+
508+
obj, reader, err := t.extractHeaderAndStream(addr.Object(), f)
509+
if err != nil {
510+
if reader != nil {
511+
_ = reader.Close()
512+
}
513+
if errors.Is(err, fs.ErrNotExist) {
514+
return nil, nil, logicerr.Wrap(apistatus.ErrObjectNotFound)
515+
}
516+
return nil, nil, fmt.Errorf("extract object stream from %q: %w", p, err)
517+
}
518+
519+
return obj, reader, nil
520+
}
521+
492522
// GetRange implements common.Storage.
493523
func (t *FSTree) GetRange(addr oid.Address, from uint64, length uint64) ([]byte, error) {
494524
obj, err := t.Get(addr)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package fstree
2+
3+
import (
4+
"crypto/rand"
5+
"io"
6+
"os"
7+
"path/filepath"
8+
"testing"
9+
10+
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
11+
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
12+
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestGetStream(t *testing.T) {
17+
tree := New(WithPath(t.TempDir()))
18+
19+
t.Run("object not found", func(t *testing.T) {
20+
addr := oidtest.Address()
21+
obj, reader, err := tree.GetStream(addr)
22+
require.Error(t, err)
23+
require.Nil(t, obj)
24+
require.Nil(t, reader)
25+
})
26+
27+
testStream := func(t *testing.T, size int, small bool) {
28+
payload := make([]byte, size)
29+
_, err := rand.Read(payload)
30+
require.NoError(t, err)
31+
32+
addr := oidtest.Address()
33+
obj := objectSDK.New()
34+
obj.SetID(addr.Object())
35+
obj.SetPayload(payload)
36+
37+
require.NoError(t, tree.Put(addr, obj.Marshal()))
38+
39+
retrievedObj, reader, err := tree.GetStream(addr)
40+
require.NoError(t, err)
41+
require.NotNil(t, retrievedObj)
42+
if small {
43+
require.Nil(t, reader)
44+
require.Equal(t, payload, retrievedObj.Payload())
45+
} else {
46+
require.NotNil(t, reader)
47+
streamedPayload, err := io.ReadAll(reader)
48+
require.NoError(t, err)
49+
require.Equal(t, payload, streamedPayload)
50+
require.NoError(t, reader.Close())
51+
}
52+
}
53+
54+
t.Run("small object", func(t *testing.T) {
55+
testStream(t, 1024, true)
56+
})
57+
58+
t.Run("large object", func(t *testing.T) {
59+
testStream(t, 10*1024*1024, false)
60+
})
61+
62+
t.Run("compressed object", func(t *testing.T) {
63+
compress := compression.Config{Enabled: true}
64+
require.NoError(t, compress.Init())
65+
tree.Config = &compress
66+
67+
testStream(t, 1024, true)
68+
testStream(t, 10*1024*1024, false)
69+
})
70+
}
71+
72+
func TestGetStreamAfterErrors(t *testing.T) {
73+
tree := New(WithPath(t.TempDir()))
74+
75+
t.Run("corrupt header", func(t *testing.T) {
76+
addr := oidtest.Address()
77+
78+
objPath := tree.treePath(addr)
79+
require.NoError(t, os.MkdirAll(filepath.Dir(objPath), 0755))
80+
81+
f, err := os.Create(objPath)
82+
require.NoError(t, err)
83+
_, err = f.Write([]byte("corrupt data that isn't a valid object"))
84+
require.NoError(t, err)
85+
require.NoError(t, f.Close())
86+
87+
obj, reader, err := tree.GetStream(addr)
88+
require.Error(t, err)
89+
require.Nil(t, obj)
90+
require.Nil(t, reader)
91+
})
92+
93+
t.Run("corrupt compressed data", func(t *testing.T) {
94+
compress := compression.Config{Enabled: true}
95+
require.NoError(t, compress.Init())
96+
tree.Config = &compress
97+
98+
addr := oidtest.Address()
99+
obj := objectSDK.New()
100+
obj.SetID(addr.Object())
101+
payload := []byte("test payload")
102+
obj.SetPayload(payload)
103+
104+
require.NoError(t, tree.Put(addr, obj.Marshal()))
105+
106+
objPath := tree.treePath(addr)
107+
108+
f, err := os.OpenFile(objPath, os.O_WRONLY|os.O_APPEND, 0644)
109+
require.NoError(t, err)
110+
_, err = f.Write([]byte("corruption at the end"))
111+
require.NoError(t, err)
112+
require.NoError(t, f.Close())
113+
114+
_, _, err = tree.GetStream(addr)
115+
require.Error(t, err)
116+
})
117+
}

0 commit comments

Comments
 (0)