Skip to content

Commit 70f7914

Browse files
committed
fstree: serve Stream from the storage
Add a new `GetStream` method to the FSTree storage. This method allows for retrieving an object's payload as a stream (`io.ReadCloser`), which is significantly more memory-efficient for large objects as it avoids loading the entire payload into memory. Signed-off-by: Andrey Butusov <[email protected]>
1 parent c745de8 commit 70f7914

File tree

4 files changed

+227
-35
lines changed

4 files changed

+227
-35
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Changelog for NeoFS Node
55

66
### Added
77
- `Head` operation for FSTree (#3383)
8+
- `GetStream` operation for FSTree (#3431)
89

910
### Fixed
1011
- IR exponentially retries updating SN lists in the Container contract in error cases (#3344)

pkg/local_object_storage/blobstor/fstree/fstree.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,34 @@ func (t *FSTree) readFullObject(f io.Reader, initial []byte, size int64) ([]byte
489489
return t.Decompress(data)
490490
}
491491

492+
// GetStream returns an object from the storage by address as a stream.
493+
// It returns the object with header only, and a reader for the payload.
494+
// The caller is responsible for closing the returned io.ReadCloser if it is not nil.
495+
func (t *FSTree) GetStream(addr oid.Address) (*objectSDK.Object, io.ReadCloser, error) {
496+
p := t.treePath(addr)
497+
498+
f, err := os.Open(p)
499+
if err != nil {
500+
if errors.Is(err, fs.ErrNotExist) {
501+
return nil, nil, logicerr.Wrap(apistatus.ObjectNotFound{})
502+
}
503+
return nil, nil, fmt.Errorf("read file %q: %w", p, err)
504+
}
505+
506+
obj, reader, err := t.extractHeaderAndStream(addr.Object(), f)
507+
if err != nil {
508+
if reader != nil {
509+
_ = reader.Close()
510+
}
511+
if errors.Is(err, fs.ErrNotExist) {
512+
return nil, nil, logicerr.Wrap(apistatus.ErrObjectNotFound)
513+
}
514+
return nil, nil, fmt.Errorf("extract object stream from %q: %w", p, err)
515+
}
516+
517+
return obj, reader, nil
518+
}
519+
492520
// GetRange implements common.Storage.
493521
func (t *FSTree) GetRange(addr oid.Address, from uint64, length uint64) ([]byte, error) {
494522
obj, err := t.Get(addr)
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package fstree
2+
3+
import (
4+
"crypto/rand"
5+
"fmt"
6+
"io"
7+
"os"
8+
"path/filepath"
9+
"testing"
10+
11+
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
12+
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
13+
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestGetStream(t *testing.T) {
18+
tree := New(WithPath(t.TempDir()))
19+
20+
payloadSizes := []int{
21+
1,
22+
1024,
23+
1024 * 1024,
24+
}
25+
26+
t.Run("object not found", func(t *testing.T) {
27+
addr := oidtest.Address()
28+
obj, reader, err := tree.GetStream(addr)
29+
require.Error(t, err)
30+
require.Nil(t, obj)
31+
require.Nil(t, reader)
32+
})
33+
34+
testStream := func(t *testing.T, size int) {
35+
payload := make([]byte, size)
36+
_, err := rand.Read(payload)
37+
require.NoError(t, err)
38+
39+
addr := oidtest.Address()
40+
obj := objectSDK.New()
41+
obj.SetID(addr.Object())
42+
obj.SetPayload(payload)
43+
44+
require.NoError(t, tree.Put(addr, obj.Marshal()))
45+
46+
retrievedObj, reader, err := tree.GetStream(addr)
47+
require.NoError(t, err)
48+
require.NotNil(t, retrievedObj)
49+
50+
require.NotNil(t, reader)
51+
streamedPayload, err := io.ReadAll(reader)
52+
require.NoError(t, err)
53+
require.Equal(t, payload, streamedPayload)
54+
require.NoError(t, reader.Close())
55+
}
56+
57+
t.Run("different objects", func(t *testing.T) {
58+
for _, size := range payloadSizes {
59+
t.Run(fmt.Sprint(size), func(t *testing.T) {
60+
testStream(t, size)
61+
})
62+
}
63+
})
64+
65+
t.Run("compressed object", func(t *testing.T) {
66+
compress := compression.Config{Enabled: true}
67+
require.NoError(t, compress.Init())
68+
tree.Config = &compress
69+
70+
for _, size := range payloadSizes {
71+
t.Run(fmt.Sprint(size), func(t *testing.T) {
72+
testStream(t, size)
73+
})
74+
}
75+
})
76+
}
77+
78+
func TestGetStreamAfterErrors(t *testing.T) {
79+
tree := New(WithPath(t.TempDir()))
80+
81+
t.Run("corrupt header", func(t *testing.T) {
82+
addr := oidtest.Address()
83+
84+
objPath := tree.treePath(addr)
85+
require.NoError(t, os.MkdirAll(filepath.Dir(objPath), 0755))
86+
87+
f, err := os.Create(objPath)
88+
require.NoError(t, err)
89+
_, err = f.Write([]byte("corrupt data that isn't a valid object"))
90+
require.NoError(t, err)
91+
require.NoError(t, f.Close())
92+
93+
obj, reader, err := tree.GetStream(addr)
94+
require.Error(t, err)
95+
require.Nil(t, obj)
96+
require.Nil(t, reader)
97+
})
98+
99+
t.Run("corrupt compressed data", func(t *testing.T) {
100+
compress := compression.Config{Enabled: true}
101+
require.NoError(t, compress.Init())
102+
tree.Config = &compress
103+
104+
addr := oidtest.Address()
105+
obj := objectSDK.New()
106+
obj.SetID(addr.Object())
107+
payload := []byte("test payload")
108+
obj.SetPayload(payload)
109+
110+
require.NoError(t, tree.Put(addr, obj.Marshal()))
111+
112+
objPath := tree.treePath(addr)
113+
114+
f, err := os.OpenFile(objPath, os.O_WRONLY|os.O_APPEND, 0644)
115+
require.NoError(t, err)
116+
_, err = f.Write([]byte("corruption at the end"))
117+
require.NoError(t, err)
118+
require.NoError(t, f.Close())
119+
120+
_, _, err = tree.GetStream(addr)
121+
require.Error(t, err)
122+
})
123+
}

0 commit comments

Comments
 (0)